From 2fb37756f7405d75a3dce0a018c36a724f04ced9 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Thu, 22 Aug 2024 14:28:44 +0200 Subject: [PATCH] fix: disable batch flushing by default (#139) * fix: disable batch flushing by default * refactor: update readme and add best practices --- Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 54 ++++++++----------------------------------- config/config.toml | 9 +++++++- src/agent/pyth/rpc.rs | 40 ++++++++++++++++++++++++++++---- 5 files changed, 54 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4de4fd98..5919b0f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3400,7 +3400,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.10.3" +version = "2.10.4" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 672bbd2a..cb8cb1d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.10.3" +version = "2.10.4" edition = "2021" [[bin]] diff --git a/README.md b/README.md index cf933590..9eaf2e0f 100644 --- a/README.md +++ b/README.md @@ -34,51 +34,7 @@ through the `RUST_LOG` environment variable using the standard `error|warn|info|debug|trace`. #### Plain/JSON logging -By default, pyth-agent will print plaintext log statements. This can be switched to structured JSON output with `-l json`. - -#### Code location in logs -For debugging purposes, you can specify `-L` to print file/line information with each log statement. This option is disabled by default. - -### Key Store Config Migration [v1.x.x LEGACY] -Pyth agent v2.0.0 introduces a simplified program and mapping key configuration. This breaking change alters how you define program/mapping key options in your agent config: -```toml -# Old v1.x.x way -[primary network] -key_store.root_path = "/path/to/keystore" -key_store.publish_keypair_path = "publish_key_pair.json" # Relative path from root_path, "publish_key_pair.json" by default -key_store.program_key_path = "program_key.json" # Relative path from root_path, "program_key.json" by default -key_store.mapping_key_path = "mapping_key.json" # Relative path from root_path, "mapping_key.json" by default - -# [...] - -# New v2.0.0 way -[primary_network] -key_store.publish_keypair_path = "/path/to/keypair.json" # The root_path is gone, we specify the full path -# Not using separate files anymore -key_store.program_key = "LiteralProgramPubkeyInsideTheConfig" # contents of legacy program_key.json; -key_store.mapping_key = "LiteralMappingPubkeyInsideTheConfig" # contents of legacy mapping_key.json - -# [...] - -``` - -#### Automatic Migration -If you are upgrading to agent v2.0.0 with an existing config, you can use the provided automatic migrator program: -```shell -# Build -$ cargo build --release -# Run the migrator, making sure that the key store with previous keys is reachable -$ target/release/agent-migrate-config -c .toml > my_new_config.toml -``` - -#### `Could not open {mapping|program|...} key file` -This error can appear if some of your program/mapping/publish key -files are not reachable under their `key_store.*` setting values. - -Ensure that your current working directory is correct for reaching the -key store path inside your config. You may also migrate manually by -changing `key_store.*_key_path` and `key_store.publish_keypair_path` -options by hand, as described in the config example above. +Pyth agent will print logs in plaintext in terminal and JSON format in non-terminal environments (e.g. when writing to a file). ## Run `cargo run --release -- --config ` will build and run the agent in a single step. @@ -86,6 +42,14 @@ options by hand, as described in the config example above. ## Publishing API A running agent will expose a WebSocket serving the JRPC publishing API documented [here](https://docs.pyth.network/documentation/publish-data/pyth-client-websocket-api). See `config/config.toml` for related settings. +## Best practices +If your publisher is publishing updates to more than 50 price feeds, it is recommended that you do the following to reduce the connection overhead to the agent: +- Batch your messages together and send them as a single request to the agent (as an array of messages). The agent will respond to the batch messages + with a single response containing an array of individual responses (in the same order). If batching is not possible, you can disable the `instant_flush` option + in the configuration file to let agent send the responses every `flush_interval` seconds. +- Do not use subscribe to the price schedule. Instead, define a schedule on the client side and send the messages based on your own schedule. Ideally + you should send price updates as soon as you have them to increase the latency of the data on the Pyth Network. + # Development ## Unit Testing A collection of Rust unit tests is provided, ran with `cargo test`. diff --git a/config/config.toml b/config/config.toml index d29904ef..cfe5f4c6 100644 --- a/config/config.toml +++ b/config/config.toml @@ -14,8 +14,15 @@ listen_address = "127.0.0.1:8910" # received from the Price state. # notify_price_sched_tx_buffer = 10000 +# Whether flush messages and responses to the client immediately. Once disabled the +# messages will be flushed every `flush_interval_duration`. Disabling it is useful if +# there are many messages to be sent between the client and the server to avoid overloading +# the connection. +# instant_flush = true + # Flush interval for responses and notifications. This is the maximum time the -# server will wait before flushing the messages to the client. +# server will wait before flushing the messages to the client. It will have no +# effect if `instant_flush` is set to true. # flush_interval_duration = "50ms" # Configuration for the primary network this agent will publish data to. In most cases this should be a Pythnet endpoint. diff --git a/src/agent/pyth/rpc.rs b/src/agent/pyth/rpc.rs index ae52f0e4..df161695 100644 --- a/src/agent/pyth/rpc.rs +++ b/src/agent/pyth/rpc.rs @@ -18,6 +18,7 @@ use { anyhow, Result, }, + futures::future::OptionFuture, futures_util::{ stream::{ SplitSink, @@ -50,7 +51,10 @@ use { sync::Arc, time::Duration, }, - tokio::sync::mpsc, + tokio::{ + sync::mpsc, + time::Interval, + }, tracing::instrument, warp::{ ws::{ @@ -111,11 +115,18 @@ enum ConnectionError { WebsocketConnectionClosed, } +#[derive(Debug)] +enum FlushStrategy { + Instant, + Interval(Interval), +} + async fn handle_connection( ws_conn: WebSocket, state: Arc, notify_price_tx_buffer: usize, notify_price_sched_tx_buffer: usize, + instant_flush: bool, flush_interval_duration: Duration, ) where S: state::Prices, @@ -129,7 +140,10 @@ async fn handle_connection( let (mut notify_price_sched_tx, mut notify_price_sched_rx) = mpsc::channel(notify_price_sched_tx_buffer); - let mut flush_interval = tokio::time::interval(flush_interval_duration); + let mut flush_strategy = match instant_flush { + true => FlushStrategy::Instant, + false => FlushStrategy::Interval(tokio::time::interval(flush_interval_duration)), + }; loop { if let Err(err) = handle_next( @@ -140,7 +154,7 @@ async fn handle_connection( &mut notify_price_rx, &mut notify_price_sched_tx, &mut notify_price_sched_rx, - &mut flush_interval, + &mut flush_strategy, ) .await { @@ -156,6 +170,7 @@ async fn handle_connection( } } +#[allow(clippy::too_many_arguments)] async fn handle_next( state: &S, ws_tx: &mut SplitSink, @@ -164,11 +179,17 @@ async fn handle_next( notify_price_rx: &mut mpsc::Receiver, notify_price_sched_tx: &mut mpsc::Sender, notify_price_sched_rx: &mut mpsc::Receiver, - flush_interval: &mut tokio::time::Interval, + flush_strategy: &mut FlushStrategy, ) -> Result<()> where S: state::Prices, { + let optional_flush_tick: OptionFuture<_> = match flush_strategy { + FlushStrategy::Instant => None, + FlushStrategy::Interval(interval) => Some(interval.tick()), + } + .into(); + tokio::select! { msg = ws_rx.next() => { match msg { @@ -196,9 +217,14 @@ where feed_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched)) .await } - _ = flush_interval.tick() => { + Some(_) = optional_flush_tick => { flush(ws_tx).await } + }?; + + match flush_strategy { + FlushStrategy::Interval(_) => Ok(()), + FlushStrategy::Instant => flush(ws_tx).await, } } @@ -413,6 +439,8 @@ pub struct Config { /// Size of the buffer of each Server's channel on which `notify_price_sched` events are /// received from the Price state. pub notify_price_sched_tx_buffer: usize, + /// Whether to flush immediately after sending a message or notification. + pub instant_flush: bool, /// Flush interval duration for the notifications. #[serde(with = "humantime_serde")] pub flush_interval_duration: Duration, @@ -424,6 +452,7 @@ impl Default for Config { listen_address: "127.0.0.1:8910".to_string(), notify_price_tx_buffer: 10000, notify_price_sched_tx_buffer: 10000, + instant_flush: true, flush_interval_duration: Duration::from_millis(50), } } @@ -465,6 +494,7 @@ where state, config.notify_price_tx_buffer, config.notify_price_sched_tx_buffer, + config.instant_flush, config.flush_interval_duration, ) .await