From 8754c47301a1cc3d62c73bd7112b498098132240 Mon Sep 17 00:00:00 2001 From: Keyvan Khademi Date: Mon, 13 May 2024 09:34:47 -0700 Subject: [PATCH] feat: add publish_interval logic (#121) * feat: add publish_interval logic * refactor: rename PublisherPermissions to PricePublishingMetadata * fix: pre-commit * chore: bump pyth-agent version * feat: use NativeTime instead of unix timestamp to have more accurate price timestamps * refactor: use match to avoid unwrap --- Cargo.lock | 2 +- Cargo.toml | 2 +- integration-tests/tests/test_integration.py | 66 +++++++++++++- src/agent/dashboard.rs | 6 +- src/agent/metrics.rs | 6 +- src/agent/pythd/adapter.rs | 14 +-- src/agent/pythd/adapter/api.rs | 2 +- src/agent/solana/exporter.rs | 99 +++++++++++++-------- src/agent/solana/oracle.rs | 70 ++++++++++++--- src/agent/store/local.rs | 4 +- 10 files changed, 197 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index efcb172f..641e24a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3270,7 +3270,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.6.2" +version = "2.7.0" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 9677e2ae..4612b389 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.6.2" +version = "2.7.0" edition = "2021" [[bin]] diff --git a/integration-tests/tests/test_integration.py b/integration-tests/tests/test_integration.py index 20d6d29d..fae2e87d 100644 --- a/integration-tests/tests/test_integration.py +++ b/integration-tests/tests/test_integration.py @@ -81,6 +81,19 @@ }, "metadata": {"jump_id": "78876711", "jump_symbol": "SOLUSD", "price_exp": -8, "min_publishers": 1}, } +PYTH_USD = { + "account": "", + "attr_dict": { + "symbol": "Crypto.PYTH/USD", + "asset_type": "Crypto", + "base": "PYTH", + "quote_currency": "USD", + "generic_symbol": "PYTHUSD", + "description": "PYTH/USD", + "publish_interval": "2", + }, + "metadata": {"jump_id": "78876712", "jump_symbol": "PYTHUSD", "price_exp": -8, "min_publishers": 1}, +} AAPL_USD = { "account": "", "attr_dict": { @@ -110,7 +123,7 @@ }, "metadata": {"jump_id": "78876710", "jump_symbol": "ETHUSD", "price_exp": -8, "min_publishers": 1}, } -ALL_PRODUCTS=[BTC_USD, AAPL_USD, ETH_USD, SOL_USD] +ALL_PRODUCTS=[BTC_USD, AAPL_USD, ETH_USD, SOL_USD, PYTH_USD] asyncio.set_event_loop(asyncio.new_event_loop()) @@ -293,6 +306,7 @@ def refdata_permissions(self, refdata_path): "BTCUSD": {"price": ["some_publisher_b", "some_publisher_a"]}, # Reversed order helps ensure permission discovery works correctly for publisher A "ETHUSD": {"price": ["some_publisher_b"]}, "SOLUSD": {"price": ["some_publisher_a"]}, + "PYTHUSD": {"price": ["some_publisher_a"]}, })) f.flush() yield f.name @@ -820,3 +834,53 @@ async def test_agent_respects_holiday_hours(self, client: PythAgentClient): assert final_price_account["price"] == 0 assert final_price_account["conf"] == 0 assert final_price_account["status"] == "unknown" + + @pytest.mark.asyncio + async def test_agent_respects_publish_interval(self, client: PythAgentClient): + ''' + Similar to test_agent_respects_market_hours, but using PYTH_USD. + This test asserts that consecutive price updates will only get published + if it's after the specified publish interval. + ''' + + # Fetch all products + products = {product["attr_dict"]["symbol"]: product for product in await client.get_all_products()} + + # Find the product account ID corresponding to the AAPL/USD symbol + product = products[PYTH_USD["attr_dict"]["symbol"]] + product_account = product["account"] + + # Get the price account with which to send updates + price_account = product["price_accounts"][0]["account"] + + # Send an "update_price" request + await client.update_price(price_account, 42, 2, "trading") + time.sleep(1) + + # Send another update_price request to "trigger" aggregation + # (aggregation would happen if publish interval were to fail, but + # we want to catch that happening if there's a problem) + await client.update_price(price_account, 81, 1, "trading") + time.sleep(2) + + # Confirm that the price account has not been updated + final_product_state = await client.get_product(product_account) + + final_price_account = final_product_state["price_accounts"][0] + assert final_price_account["price"] == 0 + assert final_price_account["conf"] == 0 + assert final_price_account["status"] == "unknown" + + + # Send another update_price request to "trigger" aggregation + # Now it is after the publish interval, so the price should be updated + await client.update_price(price_account, 81, 1, "trading") + time.sleep(2) + + # Confirm that the price account has been updated + final_product_state = await client.get_product(product_account) + + final_price_account = final_product_state["price_accounts"][0] + assert final_price_account["price"] == 42 + assert final_price_account["conf"] == 2 + assert final_price_account["status"] == "trading" diff --git a/src/agent/dashboard.rs b/src/agent/dashboard.rs index def49888..d8eedf31 100644 --- a/src/agent/dashboard.rs +++ b/src/agent/dashboard.rs @@ -102,11 +102,7 @@ impl MetricsServer { }; let last_local_update_string = if let Some(local_data) = price_data.local_data { - if let Some(datetime) = DateTime::from_timestamp(local_data.timestamp, 0) { - datetime.format("%Y-%m-%d %H:%M:%S").to_string() - } else { - format!("Invalid timestamp {}", local_data.timestamp) - } + local_data.timestamp.format("%Y-%m-%d %H:%M:%S").to_string() } else { "no data".to_string() }; diff --git a/src/agent/metrics.rs b/src/agent/metrics.rs index 63c40980..f7742b5e 100644 --- a/src/agent/metrics.rs +++ b/src/agent/metrics.rs @@ -40,9 +40,7 @@ use { }, warp::{ hyper::StatusCode, - reply::{ - self, - }, + reply, Filter, Rejection, Reply, @@ -428,7 +426,7 @@ impl PriceLocalMetrics { .get_or_create(&PriceLocalLabels { pubkey: price_key.to_string(), }) - .set(price_info.timestamp); + .set(price_info.timestamp.and_utc().timestamp()); update_count .get_or_create(&PriceLocalLabels { pubkey: price_key.to_string(), diff --git a/src/agent/pythd/adapter.rs b/src/agent/pythd/adapter.rs index 939dc842..4eaf0cfe 100644 --- a/src/agent/pythd/adapter.rs +++ b/src/agent/pythd/adapter.rs @@ -462,7 +462,7 @@ mod tests { ) .unwrap(), solana::oracle::ProductEntry { - account_data: pyth_sdk_solana::state::ProductAccount { + account_data: pyth_sdk_solana::state::ProductAccount { magic: 0xa1b2c3d4, ver: 6, atype: 4, @@ -499,8 +499,9 @@ mod tests { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ], }, - schedule: Default::default(), - price_accounts: vec![ + schedule: Default::default(), + publish_interval: None, + price_accounts: vec![ solana_sdk::pubkey::Pubkey::from_str( "GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU", ) @@ -522,7 +523,7 @@ mod tests { ) .unwrap(), solana::oracle::ProductEntry { - account_data: pyth_sdk_solana::state::ProductAccount { + account_data: pyth_sdk_solana::state::ProductAccount { magic: 0xa1b2c3d4, ver: 5, atype: 3, @@ -559,8 +560,9 @@ mod tests { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ], }, - schedule: Default::default(), - price_accounts: vec![ + schedule: Default::default(), + publish_interval: None, + price_accounts: vec![ solana_sdk::pubkey::Pubkey::from_str( "GG3FTE7xhc9Diy7dn9P6BWzoCrAEE4D3p5NBYrDAm5DD", ) diff --git a/src/agent/pythd/adapter/api.rs b/src/agent/pythd/adapter/api.rs index acbe6da7..bf221eb7 100644 --- a/src/agent/pythd/adapter/api.rs +++ b/src/agent/pythd/adapter/api.rs @@ -375,7 +375,7 @@ impl AdapterApi for Adapter { status: Adapter::map_status(&status)?, price, conf, - timestamp: Utc::now().timestamp(), + timestamp: Utc::now().naive_utc(), }, }) .await diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index 089e2889..f0efd46e 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -8,13 +8,11 @@ use { }, key_store, network::Network, + oracle::PricePublishingMetadata, }, - crate::agent::{ - market_schedule::MarketSchedule, - remote_keypair_loader::{ - KeypairRequest, - RemoteKeypairLoader, - }, + crate::agent::remote_keypair_loader::{ + KeypairRequest, + RemoteKeypairLoader, }, anyhow::{ anyhow, @@ -68,7 +66,6 @@ use { sync::{ mpsc::{ self, - error::TryRecvError, Sender, }, oneshot, @@ -174,7 +171,9 @@ pub fn spawn_exporter( network: Network, rpc_url: &str, rpc_timeout: Duration, - publisher_permissions_rx: watch::Receiver>>, + publisher_permissions_rx: watch::Receiver< + HashMap>, + >, key_store: KeyStore, local_store_tx: Sender, global_store_tx: Sender, @@ -262,10 +261,11 @@ pub struct Exporter { inflight_transactions_tx: Sender, /// publisher => { permissioned_price => market hours } as read by the oracle module - publisher_permissions_rx: watch::Receiver>>, + publisher_permissions_rx: + watch::Receiver>>, /// Currently known permissioned prices of this publisher along with their market hours - our_prices: HashMap, + our_prices: HashMap, /// Interval to update the dynamic price (if enabled) dynamic_compute_unit_price_update_interval: Interval, @@ -289,7 +289,9 @@ impl Exporter { global_store_tx: Sender, network_state_rx: watch::Receiver, inflight_transactions_tx: Sender, - publisher_permissions_rx: watch::Receiver>>, + publisher_permissions_rx: watch::Receiver< + HashMap>, + >, keypair_request_tx: mpsc::Sender, logger: Logger, ) -> Self { @@ -432,22 +434,30 @@ impl Exporter { async fn get_permissioned_updates(&mut self) -> Result> { let local_store_contents = self.fetch_local_store_contents().await?; - let now = Utc::now().timestamp(); + let publish_keypair = self.get_publish_keypair().await?; + self.update_our_prices(&publish_keypair.pubkey()); + + let now = Utc::now().naive_utc(); + + debug!(self.logger, "Exporter: filtering prices permissioned to us"; + "our_prices" => format!("{:?}", self.our_prices.keys()), + "publish_pubkey" => publish_keypair.pubkey().to_string(), + ); // Filter the contents to only include information we haven't already sent, // and to ignore stale information. - let fresh_updates = local_store_contents + Ok(local_store_contents .into_iter() .filter(|(_identifier, info)| { // Filter out timestamps that are old - (now - info.timestamp) < self.config.staleness_threshold.as_secs() as i64 + now < info.timestamp + self.config.staleness_threshold }) .filter(|(identifier, info)| { // Filter out unchanged price data if the max delay wasn't reached if let Some(last_info) = self.last_published_state.get(identifier) { - if info.timestamp.saturating_sub(last_info.timestamp) - > self.config.unchanged_publish_threshold.as_secs() as i64 + if info.timestamp + > last_info.timestamp + self.config.unchanged_publish_threshold { true // max delay since last published state reached, we publish anyway } else { @@ -457,33 +467,17 @@ impl Exporter { true // No prior data found, letting the price through } }) - .collect::>(); - - let publish_keypair = self.get_publish_keypair().await?; - - self.update_our_prices(&publish_keypair.pubkey()); - - debug!(self.logger, "Exporter: filtering prices permissioned to us"; - "our_prices" => format!("{:?}", self.our_prices.keys()), - "publish_pubkey" => publish_keypair.pubkey().to_string(), - ); - - // Get a fresh system time - let now = Utc::now(); - - // Filter out price accounts we're not permissioned to update - Ok(fresh_updates - .into_iter() .filter(|(id, _data)| { let key_from_id = Pubkey::from((*id).clone().to_bytes()); - if let Some(schedule) = self.our_prices.get(&key_from_id) { - let ret = schedule.can_publish_at(&now); + if let Some(publisher_permission) = self.our_prices.get(&key_from_id) { + let now_utc = Utc::now(); + let ret = publisher_permission.schedule.can_publish_at(&now_utc); if !ret { debug!(self.logger, "Exporter: Attempted to publish price outside market hours"; "price_account" => key_from_id.to_string(), - "schedule" => format!("{:?}", schedule), - "utc_time" => now.format("%c").to_string(), + "schedule" => format!("{:?}", publisher_permission.schedule), + "utc_time" => now_utc.format("%c").to_string(), ); } @@ -501,6 +495,33 @@ impl Exporter { false } }) + .filter(|(id, info)| { + // Filtering out prices that are being updated too frequently according to publisher_permission.publish_interval + let last_info = match self.last_published_state.get(id) { + Some(last_info) => last_info, + None => { + // No prior data found, letting the price through + return true; + } + }; + + let key_from_id = Pubkey::from((*id).clone().to_bytes()); + let publisher_metadata = match self.our_prices.get(&key_from_id) { + Some(metadata) => metadata, + None => { + // Should never happen since we have filtered out the price above + return false; + } + }; + + if let Some(publish_interval) = publisher_metadata.publish_interval { + if info.timestamp < last_info.timestamp + publish_interval { + // Updating the price too soon after the last update, skipping + return false; + } + } + true + }) .collect::>()) } @@ -623,9 +644,9 @@ impl Exporter { let network_state = *self.network_state_rx.borrow(); for (identifier, price_info_result) in refreshed_batch { let price_info = price_info_result?; + let now = Utc::now().naive_utc(); - let stale_price = (Utc::now().timestamp() - price_info.timestamp) - > self.config.staleness_threshold.as_secs() as i64; + let stale_price = now > price_info.timestamp + self.config.staleness_threshold; if stale_price { continue; } diff --git a/src/agent/solana/oracle.rs b/src/agent/solana/oracle.rs index af6e1b99..9bcda936 100644 --- a/src/agent/solana/oracle.rs +++ b/src/agent/solana/oracle.rs @@ -115,13 +115,19 @@ impl std::ops::Deref for PriceEntry { } } +#[derive(Default, Debug, Clone)] +pub struct PricePublishingMetadata { + pub schedule: MarketSchedule, + pub publish_interval: Option, +} + #[derive(Default, Debug, Clone)] pub struct Data { pub mapping_accounts: HashMap, pub product_accounts: HashMap, pub price_accounts: HashMap, - /// publisher => {their permissioned price accounts => market hours} - pub publisher_permissions: HashMap>, + /// publisher => {their permissioned price accounts => price publishing metadata} + pub publisher_permissions: HashMap>, } impl Data { @@ -129,7 +135,7 @@ impl Data { mapping_accounts: HashMap, product_accounts: HashMap, price_accounts: HashMap, - publisher_permissions: HashMap>, + publisher_permissions: HashMap>, ) -> Self { Data { mapping_accounts, @@ -143,9 +149,10 @@ impl Data { pub type MappingAccount = pyth_sdk_solana::state::MappingAccount; #[derive(Debug, Clone)] pub struct ProductEntry { - pub account_data: pyth_sdk_solana::state::ProductAccount, - pub schedule: MarketSchedule, - pub price_accounts: Vec, + pub account_data: pyth_sdk_solana::state::ProductAccount, + pub schedule: MarketSchedule, + pub price_accounts: Vec, + pub publish_interval: Option, } // Oracle is responsible for fetching Solana account data stored in the Pyth on-chain Oracle. @@ -207,7 +214,9 @@ pub fn spawn_oracle( wss_url: &str, rpc_timeout: Duration, global_store_update_tx: mpsc::Sender, - publisher_permissions_tx: watch::Sender>>, + publisher_permissions_tx: watch::Sender< + HashMap>, + >, key_store: KeyStore, logger: Logger, ) -> Vec> { @@ -422,7 +431,8 @@ struct Poller { data_tx: mpsc::Sender, /// Updates about permissioned price accounts from oracle to exporter - publisher_permissions_tx: watch::Sender>>, + publisher_permissions_tx: + watch::Sender>>, /// The RPC client to use to poll data from the RPC node rpc_client: RpcClient, @@ -442,7 +452,9 @@ struct Poller { impl Poller { pub fn new( data_tx: mpsc::Sender, - publisher_permissions_tx: watch::Sender>>, + publisher_permissions_tx: watch::Sender< + HashMap>, + >, rpc_url: &str, rpc_timeout: Duration, commitment: CommitmentLevel, @@ -483,6 +495,7 @@ impl Poller { async fn poll_and_send(&mut self) -> Result<()> { let fresh_data = self.poll().await?; + self.publisher_permissions_tx .send_replace(fresh_data.publisher_permissions.clone()); @@ -512,8 +525,13 @@ impl Poller { .entry(component.publisher) .or_insert(HashMap::new()); - let schedule = if let Some(prod_entry) = product_accounts.get(&price_entry.prod) { - prod_entry.schedule.clone() + let publisher_permission = if let Some(prod_entry) = + product_accounts.get(&price_entry.prod) + { + PricePublishingMetadata { + schedule: prod_entry.schedule.clone(), + publish_interval: prod_entry.publish_interval.clone(), + } } else { warn!(&self.logger, "Oracle: INTERNAL: could not find product from price `prod` field, market hours falling back to 24/7."; "price" => price_key.to_string(), @@ -522,7 +540,7 @@ impl Poller { Default::default() }; - component_pub_entry.insert(*price_key, schedule); + component_pub_entry.insert(*price_key, publisher_permission); } } @@ -650,12 +668,36 @@ impl Poller { None }; + let publish_interval: Option = if let Some(( + _publish_interval_key, + publish_interval_val, + )) = + product.iter().find(|(k, _v)| *k == "publish_interval") + { + match publish_interval_val.parse::() { + Ok(interval) => Some(Duration::from_secs_f64(interval)), + Err(err) => { + warn!( + self.logger, + "Oracle: Product has publish_interval defined but it could not be parsed. Falling back to None."; + "product_key" => product_key.to_string(), + "publish_interval" => publish_interval_val, + ); + debug!(self.logger, "parsing error context"; "context" => format!("{:?}", err)); + None + } + } + } else { + None + }; + product_entries.insert( *product_key, ProductEntry { - account_data: *product, - schedule: market_schedule.unwrap_or_else(|| legacy_schedule.into()), + account_data: *product, + schedule: market_schedule.unwrap_or_else(|| legacy_schedule.into()), price_accounts: vec![], + publish_interval, }, ); } else { diff --git a/src/agent/store/local.rs b/src/agent/store/local.rs index fe6b3192..c00a8afa 100644 --- a/src/agent/store/local.rs +++ b/src/agent/store/local.rs @@ -11,7 +11,7 @@ use { anyhow, Result, }, - pyth_sdk::UnixTimestamp, + chrono::NaiveDateTime, pyth_sdk_solana::state::PriceStatus, slog::Logger, solana_sdk::bs58, @@ -30,7 +30,7 @@ pub struct PriceInfo { pub status: PriceStatus, pub price: i64, pub conf: u64, - pub timestamp: UnixTimestamp, + pub timestamp: NaiveDateTime, } impl PriceInfo {