Skip to content

Commit

Permalink
Properly handle custom homedir, properly wait for tx confirmation
Browse files Browse the repository at this point in the history
  • Loading branch information
zbuc committed Jun 17, 2024
1 parent 491d137 commit 4d9c57a
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 45 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ penumbra-transaction = { path = "../penumbra/crates/core/transaction", features

# External dependencies
anyhow = "1"
camino = { version = "1" }
directories = "4.0.1"
lazy_static = "1.4.0"
regex = "1"
tracing = "0.1"
tracing-subscriber = "0.2"
tokio = { version = "1.25", features = ["full"] }
clap = { version = "3", features = ["derive"] }
clap = { version = "3", features = ["derive", "env"] }
serde_json = "1"
futures = "0.3"
derivative = "2"
Expand Down
50 changes: 28 additions & 22 deletions src/opt/serve.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Context;
use binance::{api::Binance, config::Config, market::*};
use camino::Utf8PathBuf;
use clap::Parser;
use directories::ProjectDirs;
use futures::TryStreamExt;
Expand All @@ -11,19 +12,23 @@ use penumbra_proto::{
view::v1::{view_service_client::ViewServiceClient, view_service_server::ViewServiceServer},
};
use penumbra_view::{ViewClient, ViewServer};
use std::path::PathBuf;
use url::Url;

use crate::{BinanceFetcher, Trader, Wallet};

#[derive(Debug, Clone, Parser)]
#[clap(
name = "osiris",
about = "Osiris: An example Penumbra price replication LP bot.",
version
)]
pub struct Serve {
/// The transaction fee for each response (paid in upenumbra).
#[structopt(long, default_value = "0")]
fee: u64,
/// Path to the directory to use to store data [default: platform appdata directory].
#[clap(long, short)]
data_dir: Option<PathBuf>,
/// The home directory used to store configuration and data.
#[clap(long, default_value_t = default_home(), env = "PENUMBRA_PCLI_HOME")]
home: Utf8PathBuf,
/// The URL of the pd gRPC endpoint on the remote node.
#[clap(short, long, default_value = "https://grpc.testnet.penumbra.zone")]
node: Url,
Expand Down Expand Up @@ -83,17 +88,9 @@ impl Serve {

let penumbra_config = tracing::debug_span!("penumbra-config").entered();
tracing::debug!("importing wallet material");
// Look up the path to the view state file per platform, creating the directory if needed
let data_dir = self.data_dir.unwrap_or_else(|| {
ProjectDirs::from("zone", "penumbra", "pcli")
.expect("can access penumbra project dir")
.data_dir()
.to_owned()
});
std::fs::create_dir_all(&data_dir).context("can create data dir")?;

// Build a custody service...
let pcli_config_file = data_dir.clone().join("config.toml");
let pcli_config_file = self.home.join("config.toml");
let wallet = Wallet::load(pcli_config_file)
.context("failed to load wallet from local custody file")?;
let soft_kms = SoftKms::new(wallet.spend_key.clone().into());
Expand All @@ -104,13 +101,8 @@ impl Serve {
// Wait to synchronize the chain before doing anything else.
tracing::info!(%self.node, "starting initial sync: ");
// Instantiate an in-memory view service.
let view_file = data_dir.join("pcli-view.sqlite");
let view_filepath = Some(
view_file
.to_str()
.ok_or_else(|| anyhow::anyhow!("Non-UTF8 view path"))?
.to_string(),
);
let view_file = self.home.join("pcli-view.sqlite");
let view_filepath = Some(view_file.to_string());
let view_storage =
penumbra_view::Storage::load_or_initialize(view_filepath, &fvk, self.node.clone())
.await?;
Expand All @@ -129,8 +121,14 @@ impl Serve {

let trader_config = tracing::debug_span!("trader-config").entered();
// Instantiate the trader (manages the bot's portfolio based on MPSC messages containing price quotes)
let (quotes_sender, trader) =
Trader::new(0, fvk, view, custody, symbols.clone(), self.node);
let (quotes_sender, trader) = Trader::new(
self.source_address,
fvk,
view,
custody,
symbols.clone(),
self.node,
);
trader_config.exit();

let _binance_span = tracing::debug_span!("binance-fetcher").entered();
Expand All @@ -148,3 +146,11 @@ impl Serve {
}
}
}

fn default_home() -> Utf8PathBuf {
let path = ProjectDirs::from("zone", "penumbra", "pcli")
.expect("Failed to get platform data dir")
.data_dir()
.to_path_buf();
Utf8PathBuf::from_path_buf(path).expect("Platform default data dir was not UTF-8")
}
80 changes: 58 additions & 22 deletions src/trader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::BTreeMap, future, str::FromStr};

use anyhow::Context;
use binance::model::BookTickerEvent;
use futures::{StreamExt, TryStreamExt};
use futures::{FutureExt as _, StreamExt, TryStreamExt};
use penumbra_asset::asset::Metadata;
use penumbra_custody::{AuthorizeRequest, CustodyClient};
use penumbra_dex::{
Expand All @@ -18,6 +18,8 @@ use penumbra_proto::core::component::dex::v1::query_service_client::QueryService
use penumbra_proto::core::component::dex::v1::{
LiquidityPositionsByPriceRequest, LiquidityPositionsRequest,
};
use penumbra_proto::penumbra::view::v1::broadcast_transaction_response::Status as BroadcastStatus;
use penumbra_transaction::txhash::TransactionId;
use penumbra_view::{Planner, ViewClient};
use rand::rngs::OsRng;
use tokio::sync::watch;
Expand All @@ -37,32 +39,32 @@ lazy_static! {
(
// ETH priced in terms of BTC
"ETHBTC".to_string(),
DirectedUnitPair::from_str("test_eth:test_btc").unwrap()
DirectedUnitPair::from_str("test_btc:test_eth").unwrap()
),
(
// ETH priced in terms of USDT
"ETHUSDT".to_string(),
DirectedUnitPair::from_str("test_eth:test_usd").unwrap()
DirectedUnitPair::from_str("test_usd:test_eth").unwrap()
),
(
// BTC priced in terms of USD
"BTCUSDT".to_string(),
DirectedUnitPair::from_str("test_btc:test_usd").unwrap()
DirectedUnitPair::from_str("test_usd:test_btc").unwrap()
),
(
// ATOM priced in terms of BTC
"ATOMBTC".to_string(),
DirectedUnitPair::from_str("test_atom:test_btc").unwrap()
DirectedUnitPair::from_str("test_btc:test_atom").unwrap()
),
(
// ATOM priced in terms of USDT
"ATOMUSDT".to_string(),
DirectedUnitPair::from_str("test_atom:test_usd").unwrap()
DirectedUnitPair::from_str("test_usd:test_atom").unwrap()
),
(
// OSMO priced in terms of USDT
"OSMOUSDT".to_string(),
DirectedUnitPair::from_str("test_osmo:test_usd").unwrap()
DirectedUnitPair::from_str("test_usd:test_osmo").unwrap()
),
]);
}
Expand All @@ -79,7 +81,7 @@ where
view: V,
custody: C,
fvk: FullViewingKey,
account: u32,
account: AddressIndex,
pd_url: Url,
}

Expand All @@ -90,7 +92,7 @@ where
{
/// Create a new trader.
pub fn new(
account: u32,
account: AddressIndex,
fvk: FullViewingKey,
view: V,
custody: C,
Expand Down Expand Up @@ -129,15 +131,11 @@ where
pub async fn run(mut self) -> anyhow::Result<()> {
tracing::info!("starting trader");
let trader_span = tracing::debug_span!("trader");
// TODO figure out why this span doesn't display in logs
let _ = trader_span.enter();
tracing::debug!("running trader functionality");
// Doing this loop without any shutdown signal doesn't exactly
// TODO: Doing this loop without any shutdown signal doesn't exactly
// provide a clean shutdown, but it works for now.
loop {
// TODO: ensure we have some positions from `penumbra` to create a more interesting
// trading environment :)

// Check each pair
let mut actions = self.actions.clone();
for (symbol, rx) in actions.iter_mut() {
Expand Down Expand Up @@ -173,8 +171,6 @@ where
.expect("missing symbol -> DirectedUnitPair mapping");

// Create a plan that will contain all LP management operations based on this quote.
// TODO: could move this outside the loop, but it's a little easier to debug
// the plans like this for now
let plan = &mut Planner::new(OsRng);

// Find the spendable balance for each asset in the market.
Expand Down Expand Up @@ -223,9 +219,6 @@ where
)
.await?;

// TODO: it's possible to immediately close this position within the same block
// however what if we don't get updates every block?

// Finalize and submit the transaction plan.
match self.finalize_and_submit(plan).await {
Ok(_) => {}
Expand Down Expand Up @@ -287,7 +280,50 @@ where
.await?;

// 3. Broadcast the transaction and wait for confirmation.
self.view.broadcast_transaction(tx, true).await?;
let mut rsp = self.view.broadcast_transaction(tx, true).await?;
let id: TransactionId = async move {
while let Some(rsp) = rsp.try_next().await? {
match rsp.status {
Some(status) => match status {
BroadcastStatus::BroadcastSuccess(bs) => {
tracing::debug!(
"transaction broadcast successfully: {}",
TransactionId::try_from(
bs.id.expect("detected transaction missing id")
)?
);
}
BroadcastStatus::Confirmed(c) => {
let id = c.id.expect("detected transaction missing id").try_into()?;
if c.detection_height != 0 {
tracing::debug!(
"transaction confirmed and detected: {} @ height {}",
id,
c.detection_height
);
} else {
tracing::debug!("transaction confirmed and detected: {}", id);
}
return Ok(id);
}
},
None => {
// No status is unexpected behavior
return Err(anyhow::anyhow!(
"empty BroadcastTransactionResponse message"
));
}
}
}

Err(anyhow::anyhow!(
"should have received BroadcastTransaction status or error"
))
}
.boxed()
.await
.context("error broadcasting transaction")?;
tracing::debug!(transaction_id = ?id, "broadcasted transaction");

Ok(())
}
Expand Down Expand Up @@ -378,9 +414,9 @@ where
market.into_directed_trading_pair(),
spread as u32,
// p is always the scaling value
(scaling_factor as u128 * denom_scaler).into(),
(scaling_factor as u128 * denom_scaler / 1_000).into(),
// price is expressed in units of asset 2
(mid_price as u128 * numer_scaler).into(),
(mid_price as u128 * numer_scaler / 1_000).into(),
reserves,
);

Expand Down

0 comments on commit 4d9c57a

Please sign in to comment.