diff --git a/Cargo.lock b/Cargo.lock index 8e21c1f..3246fda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2731,7 +2731,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.4.3" +version = "2.4.4" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index c3605a7..0b30856 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.4.3" +version = "2.4.4" edition = "2021" [[bin]] diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index dfc507e..dee0bb3 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -61,6 +61,7 @@ use { BTreeMap, HashMap, }, + sync::Arc, time::Duration, }, tokio::{ @@ -228,7 +229,7 @@ pub fn spawn_exporter( /// Exporter is responsible for exporting data held in the local store /// to the global Pyth Network. pub struct Exporter { - rpc_client: RpcClient, + rpc_client: Arc, config: Config, @@ -292,7 +293,10 @@ impl Exporter { ) -> Self { let publish_interval = time::interval(config.publish_interval_duration); Exporter { - rpc_client: RpcClient::new_with_timeout(rpc_url.to_string(), rpc_timeout), + rpc_client: Arc::new(RpcClient::new_with_timeout( + rpc_url.to_string(), + rpc_timeout, + )), config, network, publish_interval, @@ -536,7 +540,7 @@ impl Exporter { let mut batch_send_interval = time::interval( self.config .publish_interval_duration - .div_f64(num_batches as f64), + .div_f64((num_batches + 1) as f64), // +1 to give enough time for the last batch ); let mut batch_state = HashMap::new(); let mut batch_futures = vec![]; @@ -796,19 +800,37 @@ impl Exporter { network_state.blockhash, ); - let signature = self - .rpc_client - .send_transaction_with_config( - &transaction, - RpcSendTransactionConfig { - skip_preflight: true, - ..RpcSendTransactionConfig::default() - }, - ) - .await?; - debug!(self.logger, "sent upd_price transaction"; "signature" => signature.to_string(), "instructions" => instructions.len(), "price_accounts" => format!("{:?}", price_accounts)); + let tx = self.inflight_transactions_tx.clone(); + let logger = self.logger.clone(); + let rpc_client = self.rpc_client.clone(); + + // Fire this off in a separate task so we don't block the main thread of the exporter + tokio::spawn(async move { + let signature = match rpc_client + .send_transaction_with_config( + &transaction, + RpcSendTransactionConfig { + skip_preflight: true, + ..RpcSendTransactionConfig::default() + }, + ) + .await + { + Ok(signature) => signature, + Err(err) => { + error!(logger, "{}", err); + debug!(logger, "error context"; "context" => format!("{:?}", err)); + return; + } + }; + + debug!(logger, "sent upd_price transaction"; "signature" => signature.to_string(), "instructions" => instructions.len(), "price_accounts" => format!("{:?}", price_accounts)); - self.inflight_transactions_tx.send(signature).await?; + if let Err(err) = tx.send(signature).await { + error!(logger, "{}", err); + debug!(logger, "error context"; "context" => format!("{:?}", err)); + } + }); Ok(()) }