diff --git a/Cargo.toml b/Cargo.toml index a633d5c..5c31fb4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,13 +22,14 @@ penumbra-transaction = { path = "../penumbra/crates/core/transaction", features # External dependencies anyhow = "1" +camino = { version = "1" } directories = "4.0.1" lazy_static = "1.4.0" regex = "1" tracing = "0.1" tracing-subscriber = "0.2" tokio = { version = "1.25", features = ["full"] } -clap = { version = "3", features = ["derive"] } +clap = { version = "3", features = ["derive", "env"] } serde_json = "1" futures = "0.3" derivative = "2" diff --git a/src/opt/serve.rs b/src/opt/serve.rs index b0c5628..5dd89da 100644 --- a/src/opt/serve.rs +++ b/src/opt/serve.rs @@ -1,5 +1,6 @@ use anyhow::Context; use binance::{api::Binance, config::Config, market::*}; +use camino::Utf8PathBuf; use clap::Parser; use directories::ProjectDirs; use futures::TryStreamExt; @@ -11,19 +12,23 @@ use penumbra_proto::{ view::v1::{view_service_client::ViewServiceClient, view_service_server::ViewServiceServer}, }; use penumbra_view::{ViewClient, ViewServer}; -use std::path::PathBuf; use url::Url; use crate::{BinanceFetcher, Trader, Wallet}; #[derive(Debug, Clone, Parser)] +#[clap( + name = "osiris", + about = "Osiris: An example Penumbra price replication LP bot.", + version +)] pub struct Serve { /// The transaction fee for each response (paid in upenumbra). #[structopt(long, default_value = "0")] fee: u64, - /// Path to the directory to use to store data [default: platform appdata directory]. - #[clap(long, short)] - data_dir: Option, + /// The home directory used to store configuration and data. + #[clap(long, default_value_t = default_home(), env = "PENUMBRA_PCLI_HOME")] + home: Utf8PathBuf, /// The URL of the pd gRPC endpoint on the remote node. #[clap(short, long, default_value = "https://grpc.testnet.penumbra.zone")] node: Url, @@ -83,17 +88,9 @@ impl Serve { let penumbra_config = tracing::debug_span!("penumbra-config").entered(); tracing::debug!("importing wallet material"); - // Look up the path to the view state file per platform, creating the directory if needed - let data_dir = self.data_dir.unwrap_or_else(|| { - ProjectDirs::from("zone", "penumbra", "pcli") - .expect("can access penumbra project dir") - .data_dir() - .to_owned() - }); - std::fs::create_dir_all(&data_dir).context("can create data dir")?; // Build a custody service... - let pcli_config_file = data_dir.clone().join("config.toml"); + let pcli_config_file = self.home.join("config.toml"); let wallet = Wallet::load(pcli_config_file) .context("failed to load wallet from local custody file")?; let soft_kms = SoftKms::new(wallet.spend_key.clone().into()); @@ -104,13 +101,8 @@ impl Serve { // Wait to synchronize the chain before doing anything else. tracing::info!(%self.node, "starting initial sync: "); // Instantiate an in-memory view service. - let view_file = data_dir.join("pcli-view.sqlite"); - let view_filepath = Some( - view_file - .to_str() - .ok_or_else(|| anyhow::anyhow!("Non-UTF8 view path"))? - .to_string(), - ); + let view_file = self.home.join("pcli-view.sqlite"); + let view_filepath = Some(view_file.to_string()); let view_storage = penumbra_view::Storage::load_or_initialize(view_filepath, &fvk, self.node.clone()) .await?; @@ -129,8 +121,14 @@ impl Serve { let trader_config = tracing::debug_span!("trader-config").entered(); // Instantiate the trader (manages the bot's portfolio based on MPSC messages containing price quotes) - let (quotes_sender, trader) = - Trader::new(0, fvk, view, custody, symbols.clone(), self.node); + let (quotes_sender, trader) = Trader::new( + self.source_address, + fvk, + view, + custody, + symbols.clone(), + self.node, + ); trader_config.exit(); let _binance_span = tracing::debug_span!("binance-fetcher").entered(); @@ -148,3 +146,11 @@ impl Serve { } } } + +fn default_home() -> Utf8PathBuf { + let path = ProjectDirs::from("zone", "penumbra", "pcli") + .expect("Failed to get platform data dir") + .data_dir() + .to_path_buf(); + Utf8PathBuf::from_path_buf(path).expect("Platform default data dir was not UTF-8") +} diff --git a/src/trader.rs b/src/trader.rs index 3e8998e..bb1ce6e 100644 --- a/src/trader.rs +++ b/src/trader.rs @@ -2,7 +2,7 @@ use std::{collections::BTreeMap, future, str::FromStr}; use anyhow::Context; use binance::model::BookTickerEvent; -use futures::{StreamExt, TryStreamExt}; +use futures::{FutureExt as _, StreamExt, TryStreamExt}; use penumbra_asset::asset::Metadata; use penumbra_custody::{AuthorizeRequest, CustodyClient}; use penumbra_dex::{ @@ -18,6 +18,8 @@ use penumbra_proto::core::component::dex::v1::query_service_client::QueryService use penumbra_proto::core::component::dex::v1::{ LiquidityPositionsByPriceRequest, LiquidityPositionsRequest, }; +use penumbra_proto::penumbra::view::v1::broadcast_transaction_response::Status as BroadcastStatus; +use penumbra_transaction::txhash::TransactionId; use penumbra_view::{Planner, ViewClient}; use rand::rngs::OsRng; use tokio::sync::watch; @@ -37,32 +39,32 @@ lazy_static! { ( // ETH priced in terms of BTC "ETHBTC".to_string(), - DirectedUnitPair::from_str("test_eth:test_btc").unwrap() + DirectedUnitPair::from_str("test_btc:test_eth").unwrap() ), ( // ETH priced in terms of USDT "ETHUSDT".to_string(), - DirectedUnitPair::from_str("test_eth:test_usd").unwrap() + DirectedUnitPair::from_str("test_usd:test_eth").unwrap() ), ( // BTC priced in terms of USD "BTCUSDT".to_string(), - DirectedUnitPair::from_str("test_btc:test_usd").unwrap() + DirectedUnitPair::from_str("test_usd:test_btc").unwrap() ), ( // ATOM priced in terms of BTC "ATOMBTC".to_string(), - DirectedUnitPair::from_str("test_atom:test_btc").unwrap() + DirectedUnitPair::from_str("test_btc:test_atom").unwrap() ), ( // ATOM priced in terms of USDT "ATOMUSDT".to_string(), - DirectedUnitPair::from_str("test_atom:test_usd").unwrap() + DirectedUnitPair::from_str("test_usd:test_atom").unwrap() ), ( // OSMO priced in terms of USDT "OSMOUSDT".to_string(), - DirectedUnitPair::from_str("test_osmo:test_usd").unwrap() + DirectedUnitPair::from_str("test_usd:test_osmo").unwrap() ), ]); } @@ -79,7 +81,7 @@ where view: V, custody: C, fvk: FullViewingKey, - account: u32, + account: AddressIndex, pd_url: Url, } @@ -90,7 +92,7 @@ where { /// Create a new trader. pub fn new( - account: u32, + account: AddressIndex, fvk: FullViewingKey, view: V, custody: C, @@ -129,15 +131,11 @@ where pub async fn run(mut self) -> anyhow::Result<()> { tracing::info!("starting trader"); let trader_span = tracing::debug_span!("trader"); - // TODO figure out why this span doesn't display in logs let _ = trader_span.enter(); tracing::debug!("running trader functionality"); - // Doing this loop without any shutdown signal doesn't exactly + // TODO: Doing this loop without any shutdown signal doesn't exactly // provide a clean shutdown, but it works for now. loop { - // TODO: ensure we have some positions from `penumbra` to create a more interesting - // trading environment :) - // Check each pair let mut actions = self.actions.clone(); for (symbol, rx) in actions.iter_mut() { @@ -173,8 +171,6 @@ where .expect("missing symbol -> DirectedUnitPair mapping"); // Create a plan that will contain all LP management operations based on this quote. - // TODO: could move this outside the loop, but it's a little easier to debug - // the plans like this for now let plan = &mut Planner::new(OsRng); // Find the spendable balance for each asset in the market. @@ -223,9 +219,6 @@ where ) .await?; - // TODO: it's possible to immediately close this position within the same block - // however what if we don't get updates every block? - // Finalize and submit the transaction plan. match self.finalize_and_submit(plan).await { Ok(_) => {} @@ -287,7 +280,50 @@ where .await?; // 3. Broadcast the transaction and wait for confirmation. - self.view.broadcast_transaction(tx, true).await?; + let mut rsp = self.view.broadcast_transaction(tx, true).await?; + let id: TransactionId = async move { + while let Some(rsp) = rsp.try_next().await? { + match rsp.status { + Some(status) => match status { + BroadcastStatus::BroadcastSuccess(bs) => { + tracing::debug!( + "transaction broadcast successfully: {}", + TransactionId::try_from( + bs.id.expect("detected transaction missing id") + )? + ); + } + BroadcastStatus::Confirmed(c) => { + let id = c.id.expect("detected transaction missing id").try_into()?; + if c.detection_height != 0 { + tracing::debug!( + "transaction confirmed and detected: {} @ height {}", + id, + c.detection_height + ); + } else { + tracing::debug!("transaction confirmed and detected: {}", id); + } + return Ok(id); + } + }, + None => { + // No status is unexpected behavior + return Err(anyhow::anyhow!( + "empty BroadcastTransactionResponse message" + )); + } + } + } + + Err(anyhow::anyhow!( + "should have received BroadcastTransaction status or error" + )) + } + .boxed() + .await + .context("error broadcasting transaction")?; + tracing::debug!(transaction_id = ?id, "broadcasted transaction"); Ok(()) } @@ -378,9 +414,9 @@ where market.into_directed_trading_pair(), spread as u32, // p is always the scaling value - (scaling_factor as u128 * denom_scaler).into(), + (scaling_factor as u128 * denom_scaler / 1_000).into(), // price is expressed in units of asset 2 - (mid_price as u128 * numer_scaler).into(), + (mid_price as u128 * numer_scaler / 1_000).into(), reserves, );