diff --git a/src/storage/data_source.rs b/src/storage/data_source.rs new file mode 100644 index 0000000..d0c0b15 --- /dev/null +++ b/src/storage/data_source.rs @@ -0,0 +1,153 @@ +use std::{ + collections::{BTreeSet, HashMap}, + sync::Arc, +}; + +use anyhow::Result; +use itertools::Itertools; +use s3::Bucket; +use subsquid_messages::RangeSet; +use tokio::sync::RwLock; + +use crate::types::state::{ChunkRef, ChunkSet, Ranges}; + +use super::{ + layout::{BlockNumber, DataChunk}, + s3_fs::S3Filesystem, +}; + +#[derive(Default)] +pub struct DataSource { + dataset_state: RwLock>>, +} + +struct State { + bucket: Bucket, + last_key: Option, + known_chunks: BTreeSet, +} + +impl DataSource { + #[tracing::instrument(ret, skip_all, level = "debug")] + pub async fn find_all_chunks(&self, ranges: Ranges) -> Result { + let mut chunks = ChunkSet::new(); + for (dataset, range_set) in ranges { + if range_set.ranges.is_empty() { + continue; + } + + let mut ds_guard = self.dataset_state.read().await; + let state = if let Some(state) = ds_guard.get(&dataset) { + state + } else { + drop(ds_guard); + self.dataset_state + .write() + .await + .entry(dataset.clone()) + .or_insert(RwLock::new(State::new( + S3Filesystem::with_bucket(&dataset)?.bucket, + ))); + ds_guard = self.dataset_state.read().await; + ds_guard.get(&dataset).unwrap() + }; + + let dataset = Arc::new(dataset); + let last_required_block = BlockNumber::from(range_set.ranges.last().unwrap().end); + if !state.read().await.contains_block(last_required_block) { + state.write().await.update_chunks().await?; + } + for chunk in state.read().await.find_chunks(&range_set) { + chunks.insert(ChunkRef { + dataset: dataset.clone(), + chunk, + }); + } + } + Ok(chunks) + } + + pub async fn update_dataset(&self, dataset: String) -> Result<()> { + let mut ds_guard = self.dataset_state.write().await; + let state = ds_guard + .entry(dataset.clone()) + .or_insert(RwLock::new(State::new( + S3Filesystem::with_bucket(&dataset)?.bucket, + ))); + state.write().await.update_chunks().await?; + Ok(()) + } +} + +impl State { + fn new(bucket: Bucket) -> Self { + Self { + bucket, + known_chunks: Default::default(), + last_key: None, + } + } + + fn find_chunks(&self, ranges: &RangeSet) -> Vec { + let mut chunks = Vec::new(); + for range in ranges.ranges.iter() { + let first = DataChunk { + last_block: BlockNumber::from(range.begin), + ..Default::default() + }; + for chunk in self.known_chunks.range(first..) { + if chunk.first_block <= BlockNumber::from(range.end) { + chunks.push(chunk.clone()); + } else { + break; + } + } + } + chunks + } + + fn contains_block(&self, block: BlockNumber) -> bool { + self.known_chunks + .last() + .map(|chunk| chunk.last_block >= block) + .unwrap_or(false) + } + + #[tracing::instrument(skip(self), fields(bucket=self.bucket.name), level="debug")] + async fn update_chunks(&mut self) -> Result<()> { + tracing::debug!("Listing S3 bucket"); + let mut last_key = self.last_key.clone(); + loop { + let (list_result, _) = self + .bucket + .list_page("".to_string(), None, None, last_key.clone(), None) + .await?; + last_key = list_result.contents.last().map(|object| object.key.clone()); + if last_key.is_none() { + tracing::warn!("Didn't find new chunks in the bucket {}", self.bucket.name); + return Ok(()); + } + tracing::debug!("Found {} new keys", list_result.contents.len()); + self.known_chunks.extend( + list_result + .contents + .into_iter() + .map(|object| { + if let Some((dirname, _)) = object.key.rsplit_once('/') { + DataChunk::parse_range(dirname) + } else { + Err(anyhow::anyhow!("Invalid key: {}", object.key)) + } + }) + .collect::>>()? + .into_iter() + .dedup(), + ); + self.last_key = last_key.clone(); + if !list_result.is_truncated { + break; + } + } + Ok(()) + } +} diff --git a/src/storage/layout.rs b/src/storage/layout.rs index fbb84a1..a851ad8 100644 --- a/src/storage/layout.rs +++ b/src/storage/layout.rs @@ -2,9 +2,7 @@ use std::ops::Deref; use crate::util::iterator::WithLookahead; use anyhow::{anyhow, bail, Context, Result}; -use async_stream::try_stream; use camino::Utf8Path as Path; -use futures::Stream; use itertools::Itertools; use lazy_static::lazy_static; use regex::Regex; @@ -130,44 +128,6 @@ async fn list_chunks(fs: &impl Filesystem, top: &BlockNumber) -> Result( - fs: &'a impl Filesystem, - first_block: Option<&BlockNumber>, - last_block: Option<&BlockNumber>, -) -> impl Stream> + 'a { - let first_block = match first_block { - Some(&block) => block, - None => 0.into(), - }; - let last_block = match last_block { - Some(&block) => block, - None => u32::MAX.into(), - }; - try_stream! { - let tops = list_top_dirs(fs).await?; - for (i, &top) in tops.iter().enumerate() { - if i + 1 < tops.len() && tops[i + 1] <= first_block { - continue; - } - if last_block < top { - break; - } - let chunks = list_chunks(fs, &top).await?; - for chunk in chunks { - if last_block < chunk.first_block { - break; - } - if first_block > chunk.last_block { - continue; - } - yield chunk; - } - } - } -} - pub async fn read_all_chunks(fs: &impl Filesystem) -> Result> { let tops = list_top_dirs(fs).await?; let mut handles = Vec::new(); @@ -238,13 +198,10 @@ fn is_dir_empty(path: impl AsRef) -> bool { mod tests { use std::collections::HashMap; - use anyhow::Result; - use futures::StreamExt; - use crate::storage::{local_fs::LocalFs, tests::TestFilesystem}; use crate::util::tests::tests_data; - use super::{read_all_chunks, stream_chunks, BlockNumber, DataChunk}; + use super::{read_all_chunks, BlockNumber, DataChunk}; #[test] fn test_block_number_conversion() { @@ -333,17 +290,10 @@ mod tests { #[tokio::test] async fn test_sample() { let fs = LocalFs::new(tests_data()); - read_all_chunks(&fs).await.unwrap(); - - let stream = stream_chunks(&fs, Some(&17881400.into()), None); - let results: Vec> = stream.collect().await; - let from_stream = results.into_iter().collect::>>().unwrap(); + let chunks = read_all_chunks(&fs).await.unwrap(); assert_eq!( - from_stream, + chunks, vec![DataChunk::parse_range("0017881390/0017881390-0017882786-32ee9457").unwrap()] ); - - let from_read = read_all_chunks(&fs).await.unwrap(); - assert_eq!(from_stream, from_read); } } diff --git a/src/storage/manager.rs b/src/storage/manager.rs index 6fd8196..b610864 100644 --- a/src/storage/manager.rs +++ b/src/storage/manager.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use anyhow::{Context, Result}; use camino::{Utf8Path as Path, Utf8PathBuf as PathBuf}; -use futures::StreamExt; use parking_lot::Mutex; use tokio_util::sync::CancellationToken; use tracing::{debug, info, instrument, warn}; @@ -13,19 +12,21 @@ use crate::types::{ }; use super::{ + data_source::DataSource, downloader::ChunkDownloader, layout::{self, BlockNumber, DataChunk}, local_fs::{add_temp_prefix, LocalFs}, - s3_fs::S3Filesystem, state::{State, UpdateStatus}, Filesystem, }; +#[derive(Default)] pub struct StateManager { fs: LocalFs, state: Mutex, desired_ranges: Mutex, notify: tokio::sync::Notify, + data_source: DataSource, } pub struct Status { @@ -44,12 +45,12 @@ impl StateManager { Ok(Self { fs, state: Mutex::new(State::new(existing_chunks)), - desired_ranges: Default::default(), - notify: tokio::sync::Notify::new(), + ..Default::default() }) } pub async fn run(&self, cancellation_token: CancellationToken, concurrency: usize) { + temp_preload_chunks(&self.data_source).await; let mut downloader = ChunkDownloader::default(); loop { self.state.lock().report_status(); @@ -105,10 +106,10 @@ impl StateManager { } // TODO: prevent accidental massive removals - #[instrument(skip(self))] + #[instrument(skip_all)] pub async fn set_desired_ranges(&self, ranges: Ranges) -> Result<()> { if *self.desired_ranges.lock() != ranges { - let chunks = find_all_chunks(ranges.clone()).await?; + let chunks = self.data_source.find_all_chunks(ranges.clone()).await?; let mut cache = self.desired_ranges.lock(); let mut state = self.state.lock(); @@ -163,47 +164,6 @@ impl StateManager { } } -// TODO: make it faster by only iterating added ranges -#[instrument(ret, level = "debug")] -async fn find_all_chunks(desired: Ranges) -> Result { - let mut items = Vec::new(); - for (dataset, ranges) in desired { - let rfs = S3Filesystem::with_bucket(&dataset)?; - let dataset = Arc::new(dataset); - let mut streams = Vec::new(); - for range in ranges.ranges { - let rfs = rfs.clone(); - let stream_fut = async move { - let results = - layout::stream_chunks(&rfs, Some(&range.begin.into()), Some(&range.end.into())) - .collect::>() - .await; - results.into_iter().collect::>>() - }; - streams.push(stream_fut); - } - items.push(async move { - futures::future::try_join_all(streams.into_iter()) - .await - .map(|x| { - x.into_iter() - .flatten() - .map(|chunk| ChunkRef { - dataset: dataset.clone(), - chunk, - }) - .collect::() - }) - }); - } - let chunks = futures::future::try_join_all(items.into_iter()) - .await? - .into_iter() - .flatten() - .collect(); - Ok(chunks) -} - #[instrument(skip_all)] fn remove_temps(fs: &LocalFs) -> Result<()> { for entry in glob::glob(fs.root.join("**/temp-*").as_str())? { @@ -269,6 +229,18 @@ fn get_directory_size(path: &Path) -> u64 { result } +// TODO: remove this once the network stabilizes +async fn temp_preload_chunks(data_source: &DataSource) { + info!("Listing S3 buckets..."); + for ds in ["ethereum-mainnet-1", "base-1", "moonbeam-evm-1", "bsc-mainnet-1"] { + let result = data_source.update_dataset(ds.to_string()).await; + if let Err(err) = result { + warn!("Couldn't preload chunks for '{ds}': {err:?}"); + } + } + info!("Listing S3 buckets done"); +} + #[cfg(test)] mod tests { use camino::Utf8PathBuf as PathBuf; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index cce8646..e5d79af 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -2,6 +2,7 @@ use camino::{Utf8Path as Path, Utf8PathBuf as PathBuf}; use anyhow::Result; +pub mod data_source; pub mod downloader; pub mod guard; pub mod layout;