diff --git a/node/libs/storage/src/buffered.rs b/node/libs/storage/src/buffered.rs deleted file mode 100644 index 6179979c..00000000 --- a/node/libs/storage/src/buffered.rs +++ /dev/null @@ -1,257 +0,0 @@ -//! Buffered [`BlockStore`] implementation. - -use crate::{ - traits::{BlockStore, ContiguousBlockStore, WriteBlockStore}, - types::MissingBlockNumbers, - StorageError, StorageResult, -}; -use async_trait::async_trait; -#[cfg(test)] -use concurrency::ctx::channel; -use concurrency::{ - ctx, - sync::{self, watch, Mutex}, -}; -use roles::validator::{BlockNumber, FinalBlock}; -use std::{collections::BTreeMap, ops}; - -#[derive(Debug)] -struct BlockBuffer { - store_block_number: BlockNumber, - is_block_scheduled: bool, - blocks: BTreeMap, -} - -impl BlockBuffer { - fn new(store_block_number: BlockNumber) -> Self { - Self { - store_block_number, - is_block_scheduled: false, - blocks: BTreeMap::new(), - } - } - - fn head_block(&self) -> Option { - self.blocks.values().next_back().cloned() - } - - #[tracing::instrument(level = "trace", skip(self))] - fn set_store_block(&mut self, store_block_number: BlockNumber) { - assert_eq!( - store_block_number, - self.store_block_number.next(), - "`ContiguousBlockStore` invariant broken: unexpected new head block number" - ); - assert!( - self.is_block_scheduled, - "`ContiguousBlockStore` invariant broken: unexpected update" - ); - - self.store_block_number = store_block_number; - self.is_block_scheduled = false; - let old_len = self.blocks.len(); - self.blocks = self.blocks.split_off(&store_block_number.next()); - // ^ Removes all entries up to and including `store_block_number` - tracing::trace!("Removed {} blocks from buffer", old_len - self.blocks.len()); - } - - fn last_contiguous_block_number(&self) -> BlockNumber { - // By design, blocks in the underlying store are always contiguous. - let mut last_number = self.store_block_number; - for &number in self.blocks.keys() { - if number > last_number.next() { - return last_number; - } - last_number = number; - } - last_number - } - - fn missing_block_numbers(&self, mut range: ops::Range) -> Vec { - // Clamp the range start so we don't produce extra missing blocks. - range.start = range.start.max(self.store_block_number.next()); - if range.is_empty() { - return vec![]; // Return early to not trigger panic in `BTreeMap::range()` - } - - let keys = self.blocks.range(range.clone()).map(|(&num, _)| Ok(num)); - MissingBlockNumbers::new(range, keys) - .map(Result::unwrap) - .collect() - } - - fn put_block(&mut self, block: FinalBlock) { - let block_number = block.block.number; - assert!(block_number > self.store_block_number); - // ^ Must be checked previously - self.blocks.insert(block_number, block); - tracing::trace!(%block_number, "Inserted block in buffer"); - } - - fn next_block_for_store(&mut self) -> Option { - if self.is_block_scheduled { - None - } else { - let next_block = self.blocks.get(&self.store_block_number.next()).cloned(); - self.is_block_scheduled = next_block.is_some(); - next_block - } - } -} - -/// Events emitted by [`BufferedStorage`]. -#[cfg(test)] -#[derive(Debug)] -pub(crate) enum BufferedStorageEvent { - /// Update was received from - UpdateReceived(BlockNumber), -} - -/// [`BlockStore`] with an in-memory buffer for pending blocks. -#[derive(Debug)] -pub struct BufferedStorage { - inner: T, - inner_subscriber: watch::Receiver, - block_writes_sender: watch::Sender, - buffer: Mutex, - #[cfg(test)] - events_sender: channel::UnboundedSender, -} - -impl BufferedStorage { - /// Creates a new buffered storage. The buffer is initially empty. - pub fn new(store: T) -> Self { - let inner_subscriber = store.subscribe_to_block_writes(); - let store_block_number = *inner_subscriber.borrow(); - tracing::trace!(%store_block_number, "Initialized buffer storage"); - Self { - inner: store, - inner_subscriber, - block_writes_sender: watch::channel(store_block_number).0, - buffer: Mutex::new(BlockBuffer::new(store_block_number)), - #[cfg(test)] - events_sender: channel::unbounded().0, - } - } - - #[cfg(test)] - pub(crate) fn set_events_sender( - &mut self, - sender: channel::UnboundedSender, - ) { - self.events_sender = sender; - } - - #[cfg(test)] - pub(crate) fn as_ref(&self) -> &T { - &self.inner - } - - #[cfg(test)] - pub(crate) async fn buffer_len(&self) -> usize { - self.buffer.lock().await.blocks.len() - } - - /// Listens to the updates in the underlying storage. This method must be spawned as a background task - /// which should be running as long at the [`BufferedStorage`] is in use. Otherwise, - /// `BufferedStorage` will function incorrectly. - #[tracing::instrument(level = "trace", skip_all, err)] - pub async fn listen_to_updates(&self, ctx: &ctx::Ctx) -> StorageResult<()> { - let mut subscriber = self.inner_subscriber.clone(); - loop { - let store_block_number = *sync::changed(ctx, &mut subscriber).await?; - tracing::trace!("Underlying block number updated to {store_block_number}"); - - let next_block_for_store = { - let mut buffer = sync::lock(ctx, &self.buffer).await?; - buffer.set_store_block(store_block_number); - buffer.next_block_for_store() - }; - if let Some(block) = next_block_for_store { - self.inner.schedule_next_block(ctx, &block).await?; - let block_number = block.block.number; - tracing::trace!(%block_number, "Block scheduled in underlying storage"); - } - - #[cfg(test)] - self.events_sender - .send(BufferedStorageEvent::UpdateReceived(store_block_number)); - } - } -} - -#[async_trait] -impl BlockStore for BufferedStorage { - async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult { - let buffered_head_block = sync::lock(ctx, &self.buffer).await?.head_block(); - if let Some(block) = buffered_head_block { - return Ok(block); - } - self.inner.head_block(ctx).await - } - - async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult { - // First block is always situated in the underlying store - self.inner.first_block(ctx).await - } - - async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult { - Ok(sync::lock(ctx, &self.buffer) - .await? - .last_contiguous_block_number()) - } - - async fn block( - &self, - ctx: &ctx::Ctx, - number: BlockNumber, - ) -> StorageResult> { - { - let buffer = sync::lock(ctx, &self.buffer).await?; - if number > buffer.store_block_number { - return Ok(buffer.blocks.get(&number).cloned()); - } - } - self.inner.block(ctx, number).await - } - - async fn missing_block_numbers( - &self, - ctx: &ctx::Ctx, - range: ops::Range, - ) -> StorageResult> { - // By design, the underlying store has no missing blocks. - Ok(sync::lock(ctx, &self.buffer) - .await? - .missing_block_numbers(range)) - } - - fn subscribe_to_block_writes(&self) -> watch::Receiver { - self.block_writes_sender.subscribe() - } -} - -#[async_trait] -impl WriteBlockStore for BufferedStorage { - async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { - let next_block_for_store = { - let mut buffer = sync::lock(ctx, &self.buffer).await?; - let block_number = block.block.number; - if block_number <= buffer.store_block_number { - let err = anyhow::anyhow!( - "Cannot replace a block #{block_number} since it is already present in the underlying storage", - ); - return Err(StorageError::Database(err)); - } - buffer.put_block(block.clone()); - buffer.next_block_for_store() - }; - - if let Some(block) = next_block_for_store { - self.inner.schedule_next_block(ctx, &block).await?; - tracing::trace!(block_number = %block.block.number, "Block scheduled in underlying storage"); - } - self.block_writes_sender.send_replace(block.block.number); - Ok(()) - } -} diff --git a/node/libs/storage/src/lib.rs b/node/libs/storage/src/lib.rs index 6fcc9d20..cfcb4247 100644 --- a/node/libs/storage/src/lib.rs +++ b/node/libs/storage/src/lib.rs @@ -1,7 +1,6 @@ //! This module is responsible for persistent data storage, it provides schema-aware type-safe database access. Currently we use RocksDB, //! but this crate only exposes an abstraction of a database, so we can easily switch to a different storage engine in the future. -mod buffered; mod rocksdb; mod testonly; #[cfg(test)] @@ -10,8 +9,7 @@ mod traits; mod types; pub use crate::{ - buffered::BufferedStorage, rocksdb::RocksdbStorage, - traits::{BlockStore, ContiguousBlockStore, ReplicaStateStore, WriteBlockStore}, + traits::{BlockStore, ReplicaStateStore, WriteBlockStore}, types::{ReplicaState, StorageError, StorageResult}, }; diff --git a/node/libs/storage/src/tests.rs b/node/libs/storage/src/tests.rs index 8d0a5467..b9248331 100644 --- a/node/libs/storage/src/tests.rs +++ b/node/libs/storage/src/tests.rs @@ -1,18 +1,10 @@ use super::*; -use crate::{buffered::BufferedStorageEvent, types::ReplicaState}; -use assert_matches::assert_matches; -use async_trait::async_trait; -use concurrency::{ - ctx::{self, channel}, - scope, - sync::{self, watch}, - time, -}; -use rand::{rngs::StdRng, seq::SliceRandom, Rng}; +use crate::types::ReplicaState; +use concurrency::ctx; +use rand::{seq::SliceRandom, Rng}; use roles::validator::{Block, BlockNumber, FinalBlock}; -use std::{iter, ops}; +use std::iter; use tempfile::TempDir; -use test_casing::test_casing; async fn init_store(ctx: &ctx::Ctx, rng: &mut R) -> (FinalBlock, RocksdbStorage, TempDir) { let genesis_block = FinalBlock { @@ -170,250 +162,3 @@ fn cancellation_is_detected_in_storage_errors() { let err = anyhow::Error::from(err); assert!(err.root_cause().is::()); } - -#[derive(Debug)] -struct MockContiguousStore { - inner: RocksdbStorage, - block_sender: channel::Sender, -} - -impl MockContiguousStore { - fn new(inner: RocksdbStorage) -> (Self, channel::Receiver) { - let (block_sender, block_receiver) = channel::bounded(1); - let this = Self { - inner, - block_sender, - }; - (this, block_receiver) - } - - async fn run_updates( - &self, - ctx: &ctx::Ctx, - mut block_receiver: channel::Receiver, - ) -> StorageResult<()> { - let rng = &mut ctx.rng(); - while let Ok(block) = block_receiver.recv(ctx).await { - let sleep_duration = time::Duration::milliseconds(rng.gen_range(0..5)); - ctx.sleep(sleep_duration).await?; - self.inner.put_block(ctx, &block).await?; - } - Ok(()) - } -} - -#[async_trait] -impl BlockStore for MockContiguousStore { - async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult { - self.inner.head_block(ctx).await - } - - async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult { - self.inner.first_block(ctx).await - } - - async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult { - self.inner.last_contiguous_block_number(ctx).await - } - - async fn block( - &self, - ctx: &ctx::Ctx, - number: BlockNumber, - ) -> StorageResult> { - self.inner.block(ctx, number).await - } - - async fn missing_block_numbers( - &self, - ctx: &ctx::Ctx, - range: ops::Range, - ) -> StorageResult> { - self.inner.missing_block_numbers(ctx, range).await - } - - fn subscribe_to_block_writes(&self) -> watch::Receiver { - self.inner.subscribe_to_block_writes() - } -} - -#[async_trait] -impl ContiguousBlockStore for MockContiguousStore { - async fn schedule_next_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { - let head_block_number = self.head_block(ctx).await?.block.number; - assert_eq!(block.block.number, head_block_number.next()); - self.block_sender - .try_send(block.clone()) - .expect("BufferedStorage is rushing"); - Ok(()) - } -} - -#[tracing::instrument(level = "trace", skip(shuffle_blocks))] -async fn test_buffered_storage( - initial_block_count: usize, - block_count: usize, - block_interval: time::Duration, - shuffle_blocks: impl FnOnce(&mut StdRng, &mut [FinalBlock]), -) { - concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - - let (genesis_block, block_store, _temp_dir) = init_store(ctx, rng).await; - let mut initial_blocks = gen_blocks(rng, genesis_block.clone(), initial_block_count); - for block in &initial_blocks { - block_store.put_block(ctx, block).await.unwrap(); - } - initial_blocks.insert(0, genesis_block.clone()); - - let (block_store, block_receiver) = MockContiguousStore::new(block_store); - let mut buffered_store = BufferedStorage::new(block_store); - let (events_sender, mut events_receiver) = channel::unbounded(); - buffered_store.set_events_sender(events_sender); - - // Check initial values returned by the store. - let last_initial_block = initial_blocks.last().unwrap().clone(); - assert_eq!( - buffered_store.head_block(ctx).await.unwrap(), - last_initial_block - ); - for block in &initial_blocks { - let block_result = buffered_store.block(ctx, block.block.number).await; - assert_eq!(block_result.unwrap().as_ref(), Some(block)); - } - let mut subscriber = buffered_store.subscribe_to_block_writes(); - assert_eq!( - *subscriber.borrow(), - BlockNumber(initial_block_count as u64) - ); - - let mut blocks = gen_blocks(rng, last_initial_block, block_count); - shuffle_blocks(rng, &mut blocks); - let last_block_number = BlockNumber((block_count + initial_block_count) as u64); - - scope::run!(ctx, |ctx, s| async { - s.spawn_bg(buffered_store.as_ref().run_updates(ctx, block_receiver)); - s.spawn_bg(async { - let err = buffered_store.listen_to_updates(ctx).await.unwrap_err(); - match &err { - StorageError::Canceled(_) => Ok(()), // Test has successfully finished - StorageError::Database(_) => Err(err), - } - }); - - for (idx, block) in blocks.iter().enumerate() { - buffered_store.put_block(ctx, block).await?; - let new_block_number = *sync::changed(ctx, &mut subscriber).await?; - assert_eq!(new_block_number, block.block.number); - - // Check that all written blocks are immediately accessible. - for existing_block in initial_blocks.iter().chain(&blocks[0..=idx]) { - let number = existing_block.block.number; - assert_eq!( - buffered_store.block(ctx, number).await?.as_ref(), - Some(existing_block) - ); - } - assert_eq!(buffered_store.first_block(ctx).await?, genesis_block); - - let expected_head_block = blocks[0..=idx] - .iter() - .max_by_key(|block| block.block.number) - .unwrap(); - assert_eq!(buffered_store.head_block(ctx).await?, *expected_head_block); - - let expected_last_contiguous_block = blocks[(idx + 1)..] - .iter() - .map(|block| block.block.number) - .min() - .map_or(last_block_number, BlockNumber::prev); - assert_eq!( - buffered_store.last_contiguous_block_number(ctx).await?, - expected_last_contiguous_block - ); - - ctx.sleep(block_interval).await?; - } - - let mut inner_subscriber = buffered_store.as_ref().subscribe_to_block_writes(); - while buffered_store - .as_ref() - .last_contiguous_block_number(ctx) - .await? - < last_block_number - { - sync::changed(ctx, &mut inner_subscriber).await?; - } - - // Check events emitted by the buffered storage. This also ensures that all underlying storage - // updates are processed before proceeding to the following checks. - let expected_numbers = (initial_block_count as u64 + 1)..=last_block_number.0; - for expected_number in expected_numbers.map(BlockNumber) { - assert_matches!( - events_receiver.recv(ctx).await?, - BufferedStorageEvent::UpdateReceived(number) if number == expected_number - ); - } - - assert_eq!(buffered_store.buffer_len().await, 0); - Ok(()) - }) - .await - .unwrap(); -} - -// Choose intervals so that they are both smaller and larger than the sleep duration in -// `MockContiguousStore::run_updates()`. -const BLOCK_INTERVALS: [time::Duration; 4] = [ - time::Duration::ZERO, - time::Duration::milliseconds(3), - time::Duration::milliseconds(5), - time::Duration::milliseconds(10), -]; - -#[test_casing(4, BLOCK_INTERVALS)] -#[tokio::test] -async fn buffered_storage_with_sequential_blocks(block_interval: time::Duration) { - test_buffered_storage(0, 30, block_interval, |_, _| { - // Do not perform shuffling - }) - .await; -} - -#[test_casing(4, BLOCK_INTERVALS)] -#[tokio::test] -async fn buffered_storage_with_random_blocks(block_interval: time::Duration) { - test_buffered_storage(0, 30, block_interval, |rng, blocks| blocks.shuffle(rng)).await; -} - -#[test_casing(4, BLOCK_INTERVALS)] -#[tokio::test] -async fn buffered_storage_with_slightly_shuffled_blocks(block_interval: time::Duration) { - test_buffered_storage(0, 30, block_interval, |rng, blocks| { - for chunk in blocks.chunks_mut(4) { - chunk.shuffle(rng); - } - }) - .await; -} - -#[test_casing(4, BLOCK_INTERVALS)] -#[tokio::test] -async fn buffered_storage_with_initial_blocks(block_interval: time::Duration) { - test_buffered_storage(10, 20, block_interval, |_, _| { - // Do not perform shuffling - }) - .await; -} - -#[test_casing(4, BLOCK_INTERVALS)] -#[tokio::test] -async fn buffered_storage_with_initial_blocks_and_slight_shuffling(block_interval: time::Duration) { - test_buffered_storage(10, 20, block_interval, |rng, blocks| { - for chunk in blocks.chunks_mut(5) { - chunk.shuffle(rng); - } - }) - .await; -} diff --git a/node/libs/storage/src/traits.rs b/node/libs/storage/src/traits.rs index ea0fa1c2..e87f62ef 100644 --- a/node/libs/storage/src/traits.rs +++ b/node/libs/storage/src/traits.rs @@ -94,27 +94,6 @@ impl WriteBlockStore for Arc { } } -/// [`BlockStore`] variation that upholds additional invariants as to how blocks are processed. -/// -/// The invariants are as follows: -/// -/// - Stored blocks always have contiguous numbers; there are no gaps. -/// - Blocks can be scheduled to be added using [`Self::schedule_next_block()`] only. New blocks do not -/// appear in the store otherwise. -#[async_trait] -pub trait ContiguousBlockStore: BlockStore { - /// Schedules a block to be added to the store. Unlike [`WriteBlockStore::put_block()`], - /// there is no expectation that the block is added to the store *immediately*. It's - /// expected that it will be added to the store eventually, which will be signaled via - /// a subscriber returned from [`BlockStore::subscribe_to_block_writes()`]. - /// - /// [`BufferedStorage`] guarantees that this method will only ever be called: - /// - /// - with the next block (i.e., one immediately after [`BlockStore::head_block()`]) - /// - sequentially (i.e., multiple blocks cannot be scheduled at once) - async fn schedule_next_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()>; -} - /// Storage for [`ReplicaState`]. /// /// Implementations **must** propagate context cancellation using [`StorageError::Canceled`].