Skip to content

Commit

Permalink
Cache S3 objects list to minimize requests number
Browse files Browse the repository at this point in the history
  • Loading branch information
kalabukdima committed Apr 19, 2024
1 parent f76593d commit 7c563f3
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 100 deletions.
153 changes: 153 additions & 0 deletions src/storage/data_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use std::{
collections::{BTreeSet, HashMap},
sync::Arc,
};

use anyhow::Result;
use itertools::Itertools;
use s3::Bucket;
use subsquid_messages::RangeSet;
use tokio::sync::RwLock;

use crate::types::state::{ChunkRef, ChunkSet, Ranges};

use super::{
layout::{BlockNumber, DataChunk},
s3_fs::S3Filesystem,
};

#[derive(Default)]
pub struct DataSource {
dataset_state: RwLock<HashMap<String, RwLock<State>>>,
}

struct State {
bucket: Bucket,
last_key: Option<String>,
known_chunks: BTreeSet<DataChunk>,
}

impl DataSource {
#[tracing::instrument(ret, skip_all, level = "debug")]
pub async fn find_all_chunks(&self, ranges: Ranges) -> Result<ChunkSet> {
let mut chunks = ChunkSet::new();
for (dataset, range_set) in ranges {
if range_set.ranges.is_empty() {
continue;
}

let mut ds_guard = self.dataset_state.read().await;
let state = if let Some(state) = ds_guard.get(&dataset) {
state
} else {
drop(ds_guard);
self.dataset_state
.write()
.await
.entry(dataset.clone())
.or_insert(RwLock::new(State::new(
S3Filesystem::with_bucket(&dataset)?.bucket,
)));
ds_guard = self.dataset_state.read().await;
ds_guard.get(&dataset).unwrap()
};

let dataset = Arc::new(dataset);
let last_required_block = BlockNumber::from(range_set.ranges.last().unwrap().end);
if !state.read().await.contains_block(last_required_block) {
state.write().await.update_chunks().await?;
}
for chunk in state.read().await.find_chunks(&range_set) {
chunks.insert(ChunkRef {
dataset: dataset.clone(),
chunk,
});
}
}
Ok(chunks)
}

pub async fn update_dataset(&self, dataset: String) -> Result<()> {
let mut ds_guard = self.dataset_state.write().await;
let state = ds_guard
.entry(dataset.clone())
.or_insert(RwLock::new(State::new(
S3Filesystem::with_bucket(&dataset)?.bucket,
)));
state.write().await.update_chunks().await?;
Ok(())
}
}

impl State {
fn new(bucket: Bucket) -> Self {
Self {
bucket,
known_chunks: Default::default(),
last_key: None,
}
}

fn find_chunks(&self, ranges: &RangeSet) -> Vec<DataChunk> {
let mut chunks = Vec::new();
for range in ranges.ranges.iter() {
let first = DataChunk {
last_block: BlockNumber::from(range.begin),
..Default::default()
};
for chunk in self.known_chunks.range(first..) {
if chunk.first_block <= BlockNumber::from(range.end) {
chunks.push(chunk.clone());
} else {
break;
}
}
}
chunks
}

fn contains_block(&self, block: BlockNumber) -> bool {
self.known_chunks
.last()
.map(|chunk| chunk.last_block >= block)
.unwrap_or(false)
}

#[tracing::instrument(skip(self), fields(bucket=self.bucket.name), level="debug")]
async fn update_chunks(&mut self) -> Result<()> {
tracing::debug!("Listing S3 bucket");
let mut last_key = self.last_key.clone();
loop {
let (list_result, _) = self
.bucket
.list_page("".to_string(), None, None, last_key.clone(), None)
.await?;
last_key = list_result.contents.last().map(|object| object.key.clone());
if last_key.is_none() {
tracing::warn!("Didn't find new chunks in the bucket {}", self.bucket.name);
return Ok(());
}
tracing::debug!("Found {} new keys", list_result.contents.len());
self.known_chunks.extend(
list_result
.contents
.into_iter()
.map(|object| {
if let Some((dirname, _)) = object.key.rsplit_once('/') {
DataChunk::parse_range(dirname)
} else {
Err(anyhow::anyhow!("Invalid key: {}", object.key))
}
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.dedup(),
);
self.last_key = last_key.clone();
if !list_result.is_truncated {
break;
}
}
Ok(())
}
}
56 changes: 3 additions & 53 deletions src/storage/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use std::ops::Deref;

use crate::util::iterator::WithLookahead;
use anyhow::{anyhow, bail, Context, Result};
use async_stream::try_stream;
use camino::Utf8Path as Path;
use futures::Stream;
use itertools::Itertools;
use lazy_static::lazy_static;
use regex::Regex;
Expand Down Expand Up @@ -130,44 +128,6 @@ async fn list_chunks(fs: &impl Filesystem, top: &BlockNumber) -> Result<Vec<Data
Ok(entries)
}

// TODO: test it
#[instrument(skip_all, level = "debug")]
pub fn stream_chunks<'a>(
fs: &'a impl Filesystem,
first_block: Option<&BlockNumber>,
last_block: Option<&BlockNumber>,
) -> impl Stream<Item = Result<DataChunk>> + 'a {
let first_block = match first_block {
Some(&block) => block,
None => 0.into(),
};
let last_block = match last_block {
Some(&block) => block,
None => u32::MAX.into(),
};
try_stream! {
let tops = list_top_dirs(fs).await?;
for (i, &top) in tops.iter().enumerate() {
if i + 1 < tops.len() && tops[i + 1] <= first_block {
continue;
}
if last_block < top {
break;
}
let chunks = list_chunks(fs, &top).await?;
for chunk in chunks {
if last_block < chunk.first_block {
break;
}
if first_block > chunk.last_block {
continue;
}
yield chunk;
}
}
}
}

pub async fn read_all_chunks(fs: &impl Filesystem) -> Result<Vec<DataChunk>> {
let tops = list_top_dirs(fs).await?;
let mut handles = Vec::new();
Expand Down Expand Up @@ -238,13 +198,10 @@ fn is_dir_empty(path: impl AsRef<Path>) -> bool {
mod tests {
use std::collections::HashMap;

use anyhow::Result;
use futures::StreamExt;

use crate::storage::{local_fs::LocalFs, tests::TestFilesystem};
use crate::util::tests::tests_data;

use super::{read_all_chunks, stream_chunks, BlockNumber, DataChunk};
use super::{read_all_chunks, BlockNumber, DataChunk};

#[test]
fn test_block_number_conversion() {
Expand Down Expand Up @@ -333,17 +290,10 @@ mod tests {
#[tokio::test]
async fn test_sample() {
let fs = LocalFs::new(tests_data());
read_all_chunks(&fs).await.unwrap();

let stream = stream_chunks(&fs, Some(&17881400.into()), None);
let results: Vec<Result<DataChunk>> = stream.collect().await;
let from_stream = results.into_iter().collect::<Result<Vec<_>>>().unwrap();
let chunks = read_all_chunks(&fs).await.unwrap();
assert_eq!(
from_stream,
chunks,
vec![DataChunk::parse_range("0017881390/0017881390-0017882786-32ee9457").unwrap()]
);

let from_read = read_all_chunks(&fs).await.unwrap();
assert_eq!(from_stream, from_read);
}
}
66 changes: 19 additions & 47 deletions src/storage/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::Arc;

use anyhow::{Context, Result};
use camino::{Utf8Path as Path, Utf8PathBuf as PathBuf};
use futures::StreamExt;
use parking_lot::Mutex;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, warn};
Expand All @@ -13,19 +12,21 @@ use crate::types::{
};

use super::{
data_source::DataSource,
downloader::ChunkDownloader,
layout::{self, BlockNumber, DataChunk},
local_fs::{add_temp_prefix, LocalFs},
s3_fs::S3Filesystem,
state::{State, UpdateStatus},
Filesystem,
};

#[derive(Default)]
pub struct StateManager {
fs: LocalFs,
state: Mutex<State>,
desired_ranges: Mutex<Ranges>,
notify: tokio::sync::Notify,
data_source: DataSource,
}

pub struct Status {
Expand All @@ -44,12 +45,12 @@ impl StateManager {
Ok(Self {
fs,
state: Mutex::new(State::new(existing_chunks)),
desired_ranges: Default::default(),
notify: tokio::sync::Notify::new(),
..Default::default()
})
}

pub async fn run(&self, cancellation_token: CancellationToken, concurrency: usize) {
temp_preload_chunks(&self.data_source).await;
let mut downloader = ChunkDownloader::default();
loop {
self.state.lock().report_status();
Expand Down Expand Up @@ -105,10 +106,10 @@ impl StateManager {
}

// TODO: prevent accidental massive removals
#[instrument(skip(self))]
#[instrument(skip_all)]
pub async fn set_desired_ranges(&self, ranges: Ranges) -> Result<()> {
if *self.desired_ranges.lock() != ranges {
let chunks = find_all_chunks(ranges.clone()).await?;
let chunks = self.data_source.find_all_chunks(ranges.clone()).await?;

let mut cache = self.desired_ranges.lock();
let mut state = self.state.lock();
Expand Down Expand Up @@ -163,47 +164,6 @@ impl StateManager {
}
}

// TODO: make it faster by only iterating added ranges
#[instrument(ret, level = "debug")]
async fn find_all_chunks(desired: Ranges) -> Result<ChunkSet> {
let mut items = Vec::new();
for (dataset, ranges) in desired {
let rfs = S3Filesystem::with_bucket(&dataset)?;
let dataset = Arc::new(dataset);
let mut streams = Vec::new();
for range in ranges.ranges {
let rfs = rfs.clone();
let stream_fut = async move {
let results =
layout::stream_chunks(&rfs, Some(&range.begin.into()), Some(&range.end.into()))
.collect::<Vec<_>>()
.await;
results.into_iter().collect::<Result<Vec<_>>>()
};
streams.push(stream_fut);
}
items.push(async move {
futures::future::try_join_all(streams.into_iter())
.await
.map(|x| {
x.into_iter()
.flatten()
.map(|chunk| ChunkRef {
dataset: dataset.clone(),
chunk,
})
.collect::<ChunkSet>()
})
});
}
let chunks = futures::future::try_join_all(items.into_iter())
.await?
.into_iter()
.flatten()
.collect();
Ok(chunks)
}

#[instrument(skip_all)]
fn remove_temps(fs: &LocalFs) -> Result<()> {
for entry in glob::glob(fs.root.join("**/temp-*").as_str())? {
Expand Down Expand Up @@ -269,6 +229,18 @@ fn get_directory_size(path: &Path) -> u64 {
result
}

// TODO: remove this once the network stabilizes
async fn temp_preload_chunks(data_source: &DataSource) {
info!("Listing S3 buckets...");
for ds in ["ethereum-mainnet-1", "base-1", "moonbeam-evm-1", "bsc-mainnet-1"] {
let result = data_source.update_dataset(ds.to_string()).await;
if let Err(err) = result {
warn!("Couldn't preload chunks for '{ds}': {err:?}");
}
}
info!("Listing S3 buckets done");
}

#[cfg(test)]
mod tests {
use camino::Utf8PathBuf as PathBuf;
Expand Down
1 change: 1 addition & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use camino::{Utf8Path as Path, Utf8PathBuf as PathBuf};

use anyhow::Result;

pub mod data_source;
pub mod downloader;
pub mod guard;
pub mod layout;
Expand Down

0 comments on commit 7c563f3

Please sign in to comment.