From 79ab36bac1fc9a4e59d3ae56d372e865e2ec6117 Mon Sep 17 00:00:00 2001 From: Dzmitry Kalabuk Date: Mon, 5 Feb 2024 11:36:07 +0300 Subject: [PATCH] Fix assignment parsing --- src/storage/s3_fs.rs | 1 + src/transport/http.rs | 26 +++++++++++++++++++++++--- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/storage/s3_fs.rs b/src/storage/s3_fs.rs index f8c1ddd..1fdc3d7 100644 --- a/src/storage/s3_fs.rs +++ b/src/storage/s3_fs.rs @@ -31,6 +31,7 @@ pub struct S3Filesystem { impl S3Filesystem { pub fn with_bucket(name: &str) -> Result { + let name = name.strip_prefix("s3://").unwrap_or(name); let bucket = Bucket::new(name, REGION.to_owned(), Credentials::from_env()?)? .with_request_timeout(Duration::from_secs(TIMEOUT.to_owned())); Ok(Self { bucket }) diff --git a/src/transport/http.rs b/src/transport/http.rs index 3055c34..f4f1a6d 100644 --- a/src/transport/http.rs +++ b/src/transport/http.rs @@ -1,8 +1,12 @@ +use std::collections::HashMap; + use anyhow::Result; +use serde::Deserialize; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use crate::types::state::Ranges; +use crate::types::state::{Dataset, Ranges}; +use subsquid_messages::Range; use super::{State, Transport}; @@ -29,7 +33,7 @@ impl HttpTransport { impl Transport for HttpTransport { async fn send_ping(&self, state: State) -> Result<()> { - let resp: Ranges = reqwest::Client::new() + let resp: PingResponse = reqwest::Client::new() .post(&self.router_url) .json(&serde_json::json!({ "worker_id": self.worker_id, @@ -41,7 +45,7 @@ impl Transport for HttpTransport { .await? .json() .await?; - self.updates_tx.send(resp).await?; + self.updates_tx.send(resp.into()).await?; Ok(()) } @@ -53,3 +57,19 @@ impl Transport for HttpTransport { ReceiverStream::new(rx) } } + +#[derive(Deserialize)] +struct PingResponse(HashMap>); + +impl Into for PingResponse { + fn into(self) -> Ranges { + Ranges::from_iter(self.0.into_iter().map(|(ds, v)| { + ( + ds, + v.into_iter() + .map(|(begin, end)| Range::new(begin, end)) + .into(), + ) + })) + } +}