Skip to content

Commit

Permalink
fix: crossbar calls (#255)
Browse files Browse the repository at this point in the history
  • Loading branch information
losman0s authored Oct 20, 2024
1 parent d61516c commit f9033ac
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 18 deletions.
4 changes: 2 additions & 2 deletions observability/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ google-cloud-googleapis = { git = " https://github.com/mrgnlabs/google-cloud-rus
yup-oauth2 = "8.3.0"
yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", rev = "87e1755b0d7a4e8101cb5feb6f30063aa91f343f" }
yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc.git", rev = "87e1755b0d7a4e8101cb5feb6f30063aa91f343f" }
switchboard-on-demand-client = "0.1.7"
switchboard-on-demand = "0.1.7"
switchboard-on-demand-client = "0.2.4"
switchboard-on-demand = "0.1.15"
hex = "0.4.3"
fixed = "1.12.0"
fixed-macro = "1.2.0"
Expand Down
4 changes: 2 additions & 2 deletions observability/indexer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.72 as builder
FROM rust:1.75 as builder

RUN apt-get update -y && apt-get install -y pkg-config build-essential libudev-dev clang cmake protobuf-compiler
RUN rustup component add rustfmt clippy
Expand All @@ -15,7 +15,7 @@ COPY ./clients/rust ./clients/rust

ENV CARGO_NET_GIT_FETCH_WITH_CLI=true

RUN cargo build --release
RUN cargo build --release --locked


FROM debian:stable-slim as runner
Expand Down
17 changes: 15 additions & 2 deletions observability/indexer/src/commands/snapshot_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ pub async fn snapshot_accounts(config: SnapshotAccountsConfig) -> Result<()> {
})
.collect::<Vec<_>>(),
);
context.crossbar_store.refresh_prices().await;
context.crossbar_store.refresh_prices().await.unwrap();

snapshot
.routing_lookup
Expand All @@ -236,7 +236,20 @@ pub async fn snapshot_accounts(config: SnapshotAccountsConfig) -> Result<()> {
let context = context.clone();
async move {
loop {
context.crossbar_store.refresh_prices().await;
let mut retry_count = 0;
while retry_count < 3 {
match context.crossbar_store.refresh_prices().await {
Ok(_) => break,
Err(e) => {
retry_count += 1;
if retry_count == 3 {
error!("Failed to refresh prices after 3 attempts: {:?}", e);
} else {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
}
}
let mut snapshot = context.account_snapshot.lock().await;
let feeds_per_address: HashMap<Pubkey, crate::utils::crossbar::SimulatedPrice> =
context.crossbar_store.get_prices_per_address();
Expand Down
46 changes: 34 additions & 12 deletions observability/indexer/src/utils/crossbar.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Result;
use solana_sdk::pubkey::Pubkey;
use std::{collections::HashMap, sync::Mutex};
use switchboard_on_demand_client::CrossbarClient;
Expand Down Expand Up @@ -27,7 +28,7 @@ pub struct CrossbarCache {
impl CrossbarCache {
/// Creates a new CrossbarCache empty instance
pub fn new() -> Self {
let crossbar_client = CrossbarClient::default(None);
let crossbar_client = CrossbarClient::default();
Self {
crossbar_client,
feeds: Mutex::new(HashMap::new()),
Expand All @@ -50,24 +51,43 @@ impl CrossbarCache {
}
}

pub async fn refresh_prices(&self) {
pub async fn refresh_prices(&self) -> Result<()> {
if self.feeds.lock().unwrap().is_empty() {
return;
return Ok(());
}

let feed_hashes = self
let feed_hashes: Vec<String> = self
.feeds
.lock()
.unwrap()
.values()
.map(|feed| feed.feed_meta.feed_hash.clone())
.collect::<Vec<_>>();

let simulated_prices = self
.crossbar_client
.simulate_feeds(&feed_hashes.iter().map(|x| x.as_str()).collect::<Vec<_>>())
.await
.unwrap();
.collect();

const CHUNK_SIZE: usize = 20;

let chunk_futures: Vec<_> = feed_hashes
.chunks(CHUNK_SIZE)
.map(|chunk| {
let client = self.crossbar_client.clone();
let chunk_vec: Vec<String> = chunk.to_vec();
tokio::spawn(async move {
client
.simulate_feeds(
&chunk_vec.iter().map(|x| x.as_str()).collect::<Vec<&str>>(),
)
.await
})
})
.collect();

let chunk_results = futures::future::try_join_all(chunk_futures).await?;
let mut simulated_prices = Vec::new();
for result in chunk_results {
if let Ok(chunk_result) = result {
simulated_prices.extend(chunk_result);
}
}

let timestamp = chrono::Utc::now().timestamp();

Expand All @@ -83,6 +103,8 @@ impl CrossbarCache {
}
}
}

Ok(())
}

pub fn get_prices_per_address(&self) -> HashMap<Pubkey, SimulatedPrice> {
Expand Down Expand Up @@ -136,7 +158,7 @@ mod tests {
feed_hash: feed_hash2.clone(),
},
]);
crossbar_maintainer.refresh_prices().await;
crossbar_maintainer.refresh_prices().await.unwrap();
println!("Price: {:?}", price.lock().unwrap());
println!("Price2: {:?}", price2.lock().unwrap());
}
Expand Down

0 comments on commit f9033ac

Please sign in to comment.