Skip to content

Commit

Permalink
Merge pull request #19 from penumbra-zone/18-moar-logging
Browse files Browse the repository at this point in the history
  • Loading branch information
zbuc authored Jul 22, 2023
2 parents 0a7ed0e + 8277b84 commit 1aac7b5
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 44 deletions.
63 changes: 31 additions & 32 deletions src/binance_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use binance::websockets::WebSockets;
use binance::websockets::WebsocketEvent;

/// The `Trader` maps `(DirectedTradingPair, Amount)` position requests to [`position::Id`] identifiers of opened positions.
#[derive(Debug)]
pub struct BinanceFetcher {
/// Sends quotes to the trader.
txs: BTreeMap<String, Sender<Option<BookTickerEvent>>>,
Expand Down Expand Up @@ -36,8 +37,7 @@ impl BinanceFetcher {

/// Run the fetcher.
pub async fn run(mut self) -> anyhow::Result<()> {
tracing::debug!("running BinanceFetcher");

let _fetcher_span = tracing::debug_span!("binance-fetcher").entered();
let keep_running = AtomicBool::new(true); // Used to control the event loop
let endpoints = self
.symbols
Expand All @@ -46,48 +46,47 @@ impl BinanceFetcher {
.collect::<Vec<_>>();

let mut web_socket = WebSockets::new(|event: WebsocketEvent| {
match event {
WebsocketEvent::BookTicker(book_ticker_event) => {
// Check if this quote has already been sent or not.
let last_sent_quote = self.last_sent.get(&book_ticker_event.symbol);

if last_sent_quote.is_none()
|| (last_sent_quote.is_some()
&& last_sent_quote.unwrap().update_id < book_ticker_event.update_id &&
// we actually only care to update iff the price has changed
(last_sent_quote.unwrap().best_bid != book_ticker_event.best_bid ||
last_sent_quote.unwrap().best_ask != book_ticker_event.best_ask))
{
tracing::debug!(?book_ticker_event, ?last_sent_quote, "received new quote");
self.txs
.get(&book_ticker_event.symbol)
.expect("missing sender for symbol")
.send(Some(book_ticker_event.clone()))
.expect("error sending price quote");
self.last_sent
.insert(book_ticker_event.symbol.clone(), book_ticker_event);
}
if let WebsocketEvent::BookTicker(book_ticker_event) = event {
// Check if this quote has already been sent or not.
let last_sent_quote = self.last_sent.get(&book_ticker_event.symbol);
if last_sent_quote.is_none()
|| (last_sent_quote.is_some()
&& last_sent_quote.unwrap().update_id < book_ticker_event.update_id &&
// we actually only care to update if the price has changed
(last_sent_quote.unwrap().best_bid != book_ticker_event.best_bid ||
last_sent_quote.unwrap().best_ask != book_ticker_event.best_ask))
{
tracing::debug!(?book_ticker_event, ?last_sent_quote, "received new quote");
self.txs
.get(&book_ticker_event.symbol)
.expect("missing sender for symbol")
.send(Some(book_ticker_event.clone()))
.expect("error sending price quote");
self.last_sent
.insert(book_ticker_event.symbol.clone(), book_ticker_event);
}
_ => (),
};
Ok(())
});

tracing::debug!(?endpoints, "connecting to Binance websocket API");
web_socket
.connect_with_config(
&format!("stream?streams={}", &endpoints.join("/")),
&self.binance_config,
)
.unwrap(); // check error
.map_err(|e| anyhow::anyhow!("failed to connect to binance websocket service: {e}"))?;

tracing::debug!("maintaining open websocket connection to Binance API");
if let Err(e) = web_socket.event_loop(&keep_running) {
match e {
err => {
tracing::error!(?err, "error in web socket event loop");
anyhow::bail!(format!("Failed in web socket loop, exiting"));
}
}
tracing::error!(?e, "error in web socket event loop");
anyhow::bail!(format!("Failed in web socket loop, exiting"));
}
web_socket.disconnect().unwrap();

tracing::debug!("closing websocket connection to Binance");
web_socket.disconnect().map_err(|e| {
anyhow::anyhow!("failed to disconnect from binance websocket service: {e}")
})?;
Ok(())
}
}
20 changes: 11 additions & 9 deletions src/opt/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ impl Serve {

// the binance-rs library really hates trailing slashes here
let mut binance_rest = String::from(self.binance_rest);
if binance_rest.ends_with("/") {
if binance_rest.ends_with('/') {
binance_rest.pop();
}
let mut binance_ws = String::from(self.binance_ws);
if binance_ws.ends_with("/") {
if binance_ws.ends_with('/') {
binance_ws.pop();
}
let binance_config = Config {
Expand All @@ -83,9 +83,10 @@ impl Serve {
Err(_) => tracing::trace!("Invalid symbol: {}", permutation),
}
}

tracing::debug!(?symbols, "found valid trading symbols in Binance API");

let penumbra_config = tracing::debug_span!("penumbra-config").entered();
tracing::debug!("importing wallet material");
// Look up the path to the view state file per platform, creating the directory if needed
let data_dir = self.data_dir.unwrap_or_else(|| {
ProjectDirs::from("zone", "penumbra", "pcli")
Expand All @@ -106,30 +107,31 @@ impl Serve {

let fvk = wallet.spend_key.full_viewing_key().clone();

// Wait to synchronize the chain before doing anything else.
tracing::info!(%self.node, "starting initial sync: ");
// Instantiate an in-memory view service.
let view_storage =
penumbra_view::Storage::load_or_initialize(None::<&str>, &fvk, self.node.clone())
.await?;
let view_service = ViewService::new(view_storage, self.node.clone()).await?;

// Now build the view and custody clients, doing gRPC with ourselves
let mut view = ViewProtocolServiceClient::new(ViewProtocolServiceServer::new(view_service));

// Wait to synchronize the chain before doing anything else.
tracing::info!(
"starting initial sync: please wait for sync to complete before requesting tokens"
);
ViewClient::status_stream(&mut view, fvk.account_group_id())
.await?
.try_collect::<Vec<_>>()
.await?;
// From this point on, the view service is synchronized.
tracing::info!("initial sync complete");
tracing::info!(%self.node, "initial sync complete");
penumbra_config.exit();

let trader_config = tracing::debug_span!("trader-config").entered();
// Instantiate the trader (manages the bot's portfolio based on MPSC messages containing price quotes)
let (quotes_sender, trader) =
Trader::new(0, fvk, view, custody, symbols.clone(), self.node);
trader_config.exit();

let _binance_span = tracing::debug_span!("binance-fetcher").entered();
// Instantiate the Binance fetcher (responsible for fetching binance API data and sending along to the trader)
let binance_fetcher = BinanceFetcher::new(quotes_sender, symbols, binance_config);

Expand Down
11 changes: 8 additions & 3 deletions src/trader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ where

/// Run the trader.
pub async fn run(mut self) -> anyhow::Result<()> {
let trader_span = tracing::debug_span!("trader");
// TODO figure out why this span doesn't display in logs
let _ = trader_span.enter();
tracing::debug!("running trader functionality");
// Doing this loop without any shutdown signal doesn't exactly
// provide a clean shutdown, but it works for now.
loop {
Expand All @@ -136,6 +140,7 @@ where
if rx.has_changed().unwrap() {
let bte = rx.borrow_and_update().clone();
if bte.is_none() {
tracing::debug!(?bte, "found an empty ticket event");
continue;
}
let book_ticker_event = bte.unwrap();
Expand All @@ -148,7 +153,7 @@ where
.await?
.sync_height;
if let Some(last_updated_height) = self.last_updated_height.get(symbol) {
if !(current_height > *last_updated_height) {
if current_height <= *last_updated_height {
tracing::debug!(?symbol, "skipping symbol, already updated this block");
continue;
}
Expand Down Expand Up @@ -415,7 +420,7 @@ where
.await?;

fn is_closed_position_nft(denom: &DenomMetadata) -> bool {
let prefix = format!("lpnft_closed_");
let prefix = "lpnft_closed_".to_string();

denom.starts_with(&prefix)
}
Expand Down Expand Up @@ -481,7 +486,7 @@ where
.await?;

fn is_opened_position_nft(denom: &DenomMetadata) -> bool {
let prefix = format!("lpnft_opened_");
let prefix = "lpnft_opened_".to_string();

denom.starts_with(&prefix)
}
Expand Down

0 comments on commit 1aac7b5

Please sign in to comment.