Skip to content

Commit

Permalink
Fix assignment parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
kalabukdima committed Feb 5, 2024
1 parent 1f3e179 commit 79ab36b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/storage/s3_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct S3Filesystem {

impl S3Filesystem {
pub fn with_bucket(name: &str) -> Result<Self> {
let name = name.strip_prefix("s3://").unwrap_or(name);
let bucket = Bucket::new(name, REGION.to_owned(), Credentials::from_env()?)?
.with_request_timeout(Duration::from_secs(TIMEOUT.to_owned()));
Ok(Self { bucket })
Expand Down
26 changes: 23 additions & 3 deletions src/transport/http.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::collections::HashMap;

use anyhow::Result;
use serde::Deserialize;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

use crate::types::state::Ranges;
use crate::types::state::{Dataset, Ranges};
use subsquid_messages::Range;

use super::{State, Transport};

Expand All @@ -29,7 +33,7 @@ impl HttpTransport {

impl Transport for HttpTransport {
async fn send_ping(&self, state: State) -> Result<()> {
let resp: Ranges = reqwest::Client::new()
let resp: PingResponse = reqwest::Client::new()
.post(&self.router_url)
.json(&serde_json::json!({
"worker_id": self.worker_id,
Expand All @@ -41,7 +45,7 @@ impl Transport for HttpTransport {
.await?
.json()
.await?;
self.updates_tx.send(resp).await?;
self.updates_tx.send(resp.into()).await?;
Ok(())
}

Expand All @@ -53,3 +57,19 @@ impl Transport for HttpTransport {
ReceiverStream::new(rx)
}
}

#[derive(Deserialize)]
struct PingResponse(HashMap<Dataset, Vec<(u32, u32)>>);

impl Into<Ranges> for PingResponse {
fn into(self) -> Ranges {
Ranges::from_iter(self.0.into_iter().map(|(ds, v)| {
(
ds,
v.into_iter()
.map(|(begin, end)| Range::new(begin, end))
.into(),
)
}))
}
}

0 comments on commit 79ab36b

Please sign in to comment.