Skip to content

Commit

Permalink
fix: fire and forget update tx (#102)
Browse files Browse the repository at this point in the history
The old behaviour causes the exporter not to publishing prices
regularly. This change spawns a separate thread to unblock the exporter
loop. Also, because of the batch staggering behaviour to avoid calling
rpc for all batches at the same time, the old approach would always
results in waiting longer than the publishing period.
  • Loading branch information
ali-bahjati authored Dec 15, 2023
1 parent c76ccc4 commit 04391be
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "2.4.3"
version = "2.4.4"
edition = "2021"

[[bin]]
Expand Down
52 changes: 37 additions & 15 deletions src/agent/solana/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use {
BTreeMap,
HashMap,
},
sync::Arc,
time::Duration,
},
tokio::{
Expand Down Expand Up @@ -228,7 +229,7 @@ pub fn spawn_exporter(
/// Exporter is responsible for exporting data held in the local store
/// to the global Pyth Network.
pub struct Exporter {
rpc_client: RpcClient,
rpc_client: Arc<RpcClient>,

config: Config,

Expand Down Expand Up @@ -292,7 +293,10 @@ impl Exporter {
) -> Self {
let publish_interval = time::interval(config.publish_interval_duration);
Exporter {
rpc_client: RpcClient::new_with_timeout(rpc_url.to_string(), rpc_timeout),
rpc_client: Arc::new(RpcClient::new_with_timeout(
rpc_url.to_string(),
rpc_timeout,
)),
config,
network,
publish_interval,
Expand Down Expand Up @@ -536,7 +540,7 @@ impl Exporter {
let mut batch_send_interval = time::interval(
self.config
.publish_interval_duration
.div_f64(num_batches as f64),
.div_f64((num_batches + 1) as f64), // +1 to give enough time for the last batch
);
let mut batch_state = HashMap::new();
let mut batch_futures = vec![];
Expand Down Expand Up @@ -796,19 +800,37 @@ impl Exporter {
network_state.blockhash,
);

let signature = self
.rpc_client
.send_transaction_with_config(
&transaction,
RpcSendTransactionConfig {
skip_preflight: true,
..RpcSendTransactionConfig::default()
},
)
.await?;
debug!(self.logger, "sent upd_price transaction"; "signature" => signature.to_string(), "instructions" => instructions.len(), "price_accounts" => format!("{:?}", price_accounts));
let tx = self.inflight_transactions_tx.clone();
let logger = self.logger.clone();
let rpc_client = self.rpc_client.clone();

// Fire this off in a separate task so we don't block the main thread of the exporter
tokio::spawn(async move {
let signature = match rpc_client
.send_transaction_with_config(
&transaction,
RpcSendTransactionConfig {
skip_preflight: true,
..RpcSendTransactionConfig::default()
},
)
.await
{
Ok(signature) => signature,
Err(err) => {
error!(logger, "{}", err);
debug!(logger, "error context"; "context" => format!("{:?}", err));
return;
}
};

debug!(logger, "sent upd_price transaction"; "signature" => signature.to_string(), "instructions" => instructions.len(), "price_accounts" => format!("{:?}", price_accounts));

self.inflight_transactions_tx.send(signature).await?;
if let Err(err) = tx.send(signature).await {
error!(logger, "{}", err);
debug!(logger, "error context"; "context" => format!("{:?}", err));
}
});

Ok(())
}
Expand Down

0 comments on commit 04391be

Please sign in to comment.