Skip to content

Commit

Permalink
fix: disable batch flushing by default (#139)
Browse files Browse the repository at this point in the history
* fix: disable batch flushing by default

* refactor: update readme and add best practices
  • Loading branch information
ali-bahjati authored Aug 22, 2024
1 parent f0fd404 commit 2fb3775
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 53 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "2.10.3"
version = "2.10.4"
edition = "2021"

[[bin]]
Expand Down
54 changes: 9 additions & 45 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,58 +34,22 @@ through the `RUST_LOG` environment variable using the standard
`error|warn|info|debug|trace`.

#### Plain/JSON logging
By default, pyth-agent will print plaintext log statements. This can be switched to structured JSON output with `-l json`.

#### Code location in logs
For debugging purposes, you can specify `-L` to print file/line information with each log statement. This option is disabled by default.

### Key Store Config Migration [v1.x.x LEGACY]
Pyth agent v2.0.0 introduces a simplified program and mapping key configuration. This breaking change alters how you define program/mapping key options in your agent config:
```toml
# Old v1.x.x way
[primary network]
key_store.root_path = "/path/to/keystore"
key_store.publish_keypair_path = "publish_key_pair.json" # Relative path from root_path, "publish_key_pair.json" by default
key_store.program_key_path = "program_key.json" # Relative path from root_path, "program_key.json" by default
key_store.mapping_key_path = "mapping_key.json" # Relative path from root_path, "mapping_key.json" by default

# [...]

# New v2.0.0 way
[primary_network]
key_store.publish_keypair_path = "/path/to/keypair.json" # The root_path is gone, we specify the full path
# Not using separate files anymore
key_store.program_key = "LiteralProgramPubkeyInsideTheConfig" # contents of legacy program_key.json;
key_store.mapping_key = "LiteralMappingPubkeyInsideTheConfig" # contents of legacy mapping_key.json

# [...]

```

#### Automatic Migration
If you are upgrading to agent v2.0.0 with an existing config, you can use the provided automatic migrator program:
```shell
# Build
$ cargo build --release
# Run the migrator, making sure that the key store with previous keys is reachable
$ target/release/agent-migrate-config -c <existing_config_file>.toml > my_new_config.toml
```

#### `Could not open {mapping|program|...} key file`
This error can appear if some of your program/mapping/publish key
files are not reachable under their `key_store.*` setting values.

Ensure that your current working directory is correct for reaching the
key store path inside your config. You may also migrate manually by
changing `key_store.*_key_path` and `key_store.publish_keypair_path`
options by hand, as described in the config example above.
Pyth agent will print logs in plaintext in terminal and JSON format in non-terminal environments (e.g. when writing to a file).

## Run
`cargo run --release -- --config <your_config.toml>` will build and run the agent in a single step.

## Publishing API
A running agent will expose a WebSocket serving the JRPC publishing API documented [here](https://docs.pyth.network/documentation/publish-data/pyth-client-websocket-api). See `config/config.toml` for related settings.

## Best practices
If your publisher is publishing updates to more than 50 price feeds, it is recommended that you do the following to reduce the connection overhead to the agent:
- Batch your messages together and send them as a single request to the agent (as an array of messages). The agent will respond to the batch messages
with a single response containing an array of individual responses (in the same order). If batching is not possible, you can disable the `instant_flush` option
in the configuration file to let agent send the responses every `flush_interval` seconds.
- Do not use subscribe to the price schedule. Instead, define a schedule on the client side and send the messages based on your own schedule. Ideally
you should send price updates as soon as you have them to increase the latency of the data on the Pyth Network.

# Development
## Unit Testing
A collection of Rust unit tests is provided, ran with `cargo test`.
Expand Down
9 changes: 8 additions & 1 deletion config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@ listen_address = "127.0.0.1:8910"
# received from the Price state.
# notify_price_sched_tx_buffer = 10000

# Whether flush messages and responses to the client immediately. Once disabled the
# messages will be flushed every `flush_interval_duration`. Disabling it is useful if
# there are many messages to be sent between the client and the server to avoid overloading
# the connection.
# instant_flush = true

# Flush interval for responses and notifications. This is the maximum time the
# server will wait before flushing the messages to the client.
# server will wait before flushing the messages to the client. It will have no
# effect if `instant_flush` is set to true.
# flush_interval_duration = "50ms"

# Configuration for the primary network this agent will publish data to. In most cases this should be a Pythnet endpoint.
Expand Down
40 changes: 35 additions & 5 deletions src/agent/pyth/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use {
anyhow,
Result,
},
futures::future::OptionFuture,
futures_util::{
stream::{
SplitSink,
Expand Down Expand Up @@ -50,7 +51,10 @@ use {
sync::Arc,
time::Duration,
},
tokio::sync::mpsc,
tokio::{
sync::mpsc,
time::Interval,
},
tracing::instrument,
warp::{
ws::{
Expand Down Expand Up @@ -111,11 +115,18 @@ enum ConnectionError {
WebsocketConnectionClosed,
}

#[derive(Debug)]
enum FlushStrategy {
Instant,
Interval(Interval),
}

async fn handle_connection<S>(
ws_conn: WebSocket,
state: Arc<S>,
notify_price_tx_buffer: usize,
notify_price_sched_tx_buffer: usize,
instant_flush: bool,
flush_interval_duration: Duration,
) where
S: state::Prices,
Expand All @@ -129,7 +140,10 @@ async fn handle_connection<S>(
let (mut notify_price_sched_tx, mut notify_price_sched_rx) =
mpsc::channel(notify_price_sched_tx_buffer);

let mut flush_interval = tokio::time::interval(flush_interval_duration);
let mut flush_strategy = match instant_flush {
true => FlushStrategy::Instant,
false => FlushStrategy::Interval(tokio::time::interval(flush_interval_duration)),
};

loop {
if let Err(err) = handle_next(
Expand All @@ -140,7 +154,7 @@ async fn handle_connection<S>(
&mut notify_price_rx,
&mut notify_price_sched_tx,
&mut notify_price_sched_rx,
&mut flush_interval,
&mut flush_strategy,
)
.await
{
Expand All @@ -156,6 +170,7 @@ async fn handle_connection<S>(
}
}

#[allow(clippy::too_many_arguments)]
async fn handle_next<S>(
state: &S,
ws_tx: &mut SplitSink<WebSocket, Message>,
Expand All @@ -164,11 +179,17 @@ async fn handle_next<S>(
notify_price_rx: &mut mpsc::Receiver<NotifyPrice>,
notify_price_sched_tx: &mut mpsc::Sender<NotifyPriceSched>,
notify_price_sched_rx: &mut mpsc::Receiver<NotifyPriceSched>,
flush_interval: &mut tokio::time::Interval,
flush_strategy: &mut FlushStrategy,
) -> Result<()>
where
S: state::Prices,
{
let optional_flush_tick: OptionFuture<_> = match flush_strategy {
FlushStrategy::Instant => None,
FlushStrategy::Interval(interval) => Some(interval.tick()),
}
.into();

tokio::select! {
msg = ws_rx.next() => {
match msg {
Expand Down Expand Up @@ -196,9 +217,14 @@ where
feed_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched))
.await
}
_ = flush_interval.tick() => {
Some(_) = optional_flush_tick => {
flush(ws_tx).await
}
}?;

match flush_strategy {
FlushStrategy::Interval(_) => Ok(()),
FlushStrategy::Instant => flush(ws_tx).await,
}
}

Expand Down Expand Up @@ -413,6 +439,8 @@ pub struct Config {
/// Size of the buffer of each Server's channel on which `notify_price_sched` events are
/// received from the Price state.
pub notify_price_sched_tx_buffer: usize,
/// Whether to flush immediately after sending a message or notification.
pub instant_flush: bool,
/// Flush interval duration for the notifications.
#[serde(with = "humantime_serde")]
pub flush_interval_duration: Duration,
Expand All @@ -424,6 +452,7 @@ impl Default for Config {
listen_address: "127.0.0.1:8910".to_string(),
notify_price_tx_buffer: 10000,
notify_price_sched_tx_buffer: 10000,
instant_flush: true,
flush_interval_duration: Duration::from_millis(50),
}
}
Expand Down Expand Up @@ -465,6 +494,7 @@ where
state,
config.notify_price_tx_buffer,
config.notify_price_sched_tx_buffer,
config.instant_flush,
config.flush_interval_duration,
)
.await
Expand Down

0 comments on commit 2fb3775

Please sign in to comment.