Skip to content

Commit

Permalink
Test buffered storage with initial blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
slowli committed Oct 23, 2023
1 parent cf0a7e7 commit 1d5fc5e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 17 deletions.
4 changes: 4 additions & 0 deletions node/libs/storage/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl BlockBuffer {
self.blocks.values().next_back().cloned()
}

#[tracing::instrument(level = "trace", skip(self))]
fn set_store_block(&mut self, store_block_number: BlockNumber) {
assert_eq!(
store_block_number,
Expand All @@ -66,8 +67,10 @@ impl BlockBuffer {

self.store_block_number = store_block_number;
self.is_block_scheduled = false;
let old_len = self.blocks.len();
self.blocks = self.blocks.split_off(&store_block_number.next());
// ^ Removes all entries up to and including `store_block_number`
tracing::trace!("Removed {} blocks from buffer", old_len - self.blocks.len());
}

fn last_contiguous_block_number(&self) -> BlockNumber {
Expand Down Expand Up @@ -128,6 +131,7 @@ impl<T: ContiguousBlockStore> BufferedStorage<T> {
pub fn new(store: T) -> Self {
let inner_subscriber = store.subscribe_to_block_writes();
let store_block_number = *inner_subscriber.borrow();
tracing::trace!(%store_block_number, "Initialized buffer storage");
Self {
inner: store,
inner_subscriber,
Expand Down
60 changes: 43 additions & 17 deletions node/libs/storage/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ impl ContiguousBlockStore for MockContiguousStore {

#[tracing::instrument(level = "trace", skip(shuffle_blocks))]
async fn test_buffered_storage(
initial_block_count: usize,
block_count: usize,
block_interval: time::Duration,
shuffle_blocks: impl FnOnce(&mut StdRng, &mut [FinalBlock]),
Expand All @@ -259,29 +260,34 @@ async fn test_buffered_storage(
let rng = &mut ctx.rng();

let (genesis_block, block_store, _temp_dir) = init_store(ctx, rng).await;
let mut initial_blocks = gen_blocks(rng, genesis_block.clone(), initial_block_count);
for block in &initial_blocks {
block_store.put_block(ctx, block).await.unwrap();
}
initial_blocks.insert(0, genesis_block.clone());

let (block_store, block_receiver) = MockContiguousStore::new(block_store);
let buffered_store = BufferedStorage::new(block_store);

// Check initial values returned by the store.
assert_eq!(buffered_store.head_block(ctx).await.unwrap(), genesis_block);
let last_initial_block = initial_blocks.last().unwrap().clone();
assert_eq!(
buffered_store
.block(ctx, BlockNumber(0))
.await
.unwrap()
.as_ref(),
Some(&genesis_block)
buffered_store.head_block(ctx).await.unwrap(),
last_initial_block
);
for block in &initial_blocks {
let block_result = buffered_store.block(ctx, block.block.number).await;
assert_eq!(block_result.unwrap().as_ref(), Some(block));
}
let mut subscriber = buffered_store.subscribe_to_block_writes();
assert_eq!(
buffered_store.block(ctx, BlockNumber(1)).await.unwrap(),
None
*subscriber.borrow(),
BlockNumber(initial_block_count as u64)
);
let mut subscriber = buffered_store.subscribe_to_block_writes();
assert_eq!(*subscriber.borrow(), BlockNumber(0));

let mut blocks = gen_blocks(rng, genesis_block.clone(), block_count);
let mut blocks = gen_blocks(rng, last_initial_block, block_count);
shuffle_blocks(rng, &mut blocks);
let last_block_number = BlockNumber(block_count as u64);
let last_block_number = BlockNumber((block_count + initial_block_count) as u64);

scope::run!(ctx, |ctx, s| async {
s.spawn_bg(buffered_store.as_ref().run_updates(ctx, block_receiver));
Expand All @@ -299,7 +305,7 @@ async fn test_buffered_storage(
assert_eq!(new_block_number, block.block.number);

// Check that all written blocks are immediately accessible.
for existing_block in &blocks[0..=idx] {
for existing_block in initial_blocks.iter().chain(&blocks[0..=idx]) {
let number = existing_block.block.number;
assert_eq!(
buffered_store.block(ctx, number).await?.as_ref(),
Expand Down Expand Up @@ -356,7 +362,7 @@ const BLOCK_INTERVALS: [time::Duration; 4] = [
#[test_casing(4, BLOCK_INTERVALS)]
#[tokio::test]
async fn buffered_storage_with_sequential_blocks(block_interval: time::Duration) {
test_buffered_storage(30, block_interval, |_, _| {
test_buffered_storage(0, 30, block_interval, |_, _| {
// Do not perform shuffling
})
.await;
Expand All @@ -365,16 +371,36 @@ async fn buffered_storage_with_sequential_blocks(block_interval: time::Duration)
#[test_casing(4, BLOCK_INTERVALS)]
#[tokio::test]
async fn buffered_storage_with_random_blocks(block_interval: time::Duration) {
test_buffered_storage(30, block_interval, |rng, blocks| blocks.shuffle(rng)).await;
test_buffered_storage(0, 30, block_interval, |rng, blocks| blocks.shuffle(rng)).await;
}

#[test_casing(4, BLOCK_INTERVALS)]
#[tokio::test]
async fn buffered_storage_with_slightly_shuffled_blocks(block_interval: time::Duration) {
test_buffered_storage(30, block_interval, |rng, blocks| {
test_buffered_storage(0, 30, block_interval, |rng, blocks| {
for chunk in blocks.chunks_mut(4) {
chunk.shuffle(rng);
}
})
.await;
}

#[test_casing(4, BLOCK_INTERVALS)]
#[tokio::test]
async fn buffered_storage_with_initial_blocks(block_interval: time::Duration) {
test_buffered_storage(10, 20, block_interval, |_, _| {
// Do not perform shuffling
})
.await;
}

#[test_casing(4, BLOCK_INTERVALS)]
#[tokio::test]
async fn buffered_storage_with_initial_blocks_and_slight_shuffling(block_interval: time::Duration) {
test_buffered_storage(10, 20, block_interval, |rng, blocks| {
for chunk in blocks.chunks_mut(5) {
chunk.shuffle(rng);
}
})
.await;
}

0 comments on commit 1d5fc5e

Please sign in to comment.