From fe38f3d382b9ee0bd4e36f4e35858fe33bb2ccd8 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 24 Oct 2023 17:47:08 +0300 Subject: [PATCH] feat: Rework consensus storage crate (#10) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # What ❔ - Extracts storage API into trait(s) and make them async. - Introduces buffered storage (i.e., one that schedules sequential block execution). ## Why ❔ Part of preparations for the integration with the server codebase. --- node/Cargo.lock | 5 + node/Cranky.toml | 5 + .../consensus/src/leader/state_machine.rs | 5 +- node/actors/consensus/src/leader/tests.rs | 2 +- node/actors/consensus/src/lib.rs | 28 +- node/actors/consensus/src/replica/error.rs | 2 + .../consensus/src/replica/leader_commit.rs | 5 +- .../consensus/src/replica/leader_prepare.rs | 2 +- node/actors/consensus/src/replica/new_view.rs | 19 +- .../consensus/src/replica/state_machine.rs | 64 ++-- node/actors/consensus/src/replica/tests.rs | 19 +- node/actors/consensus/src/testonly/make.rs | 15 +- node/actors/consensus/src/testonly/run.rs | 15 +- node/actors/executor/src/main.rs | 50 ++- node/actors/sync_blocks/src/lib.rs | 48 +-- .../actors/sync_blocks/src/message_handler.rs | 28 +- node/actors/sync_blocks/src/peers/mod.rs | 36 +- node/actors/sync_blocks/src/peers/tests.rs | 71 ++-- .../sync_blocks/src/tests/end_to_end.rs | 21 +- node/actors/sync_blocks/src/tests/mod.rs | 35 +- node/libs/concurrency/src/scope/mod.rs | 15 + node/libs/storage/Cargo.toml | 5 + node/libs/storage/src/block_store.rs | 178 --------- node/libs/storage/src/lib.rs | 94 +---- node/libs/storage/src/replica.rs | 30 -- node/libs/storage/src/rocksdb.rs | 340 ++++++++++++++++++ node/libs/storage/src/tests.rs | 91 +++-- node/libs/storage/src/traits.rs | 126 +++++++ node/libs/storage/src/types.rs | 86 ++++- 29 files changed, 910 insertions(+), 530 deletions(-) delete mode 100644 node/libs/storage/src/block_store.rs delete mode 100644 node/libs/storage/src/replica.rs create mode 100644 node/libs/storage/src/rocksdb.rs create mode 100644 node/libs/storage/src/traits.rs diff --git a/node/Cargo.lock b/node/Cargo.lock index 3755a046..945ac86c 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -1892,12 +1892,17 @@ name = "storage" version = "0.1.0" dependencies = [ "anyhow", + "assert_matches", + "async-trait", "concurrency", "rand", "rocksdb", "roles", "schema", "tempfile", + "test-casing", + "thiserror", + "tokio", "tracing", ] diff --git a/node/Cranky.toml b/node/Cranky.toml index fc3e2d85..ffe11cbe 100644 --- a/node/Cranky.toml +++ b/node/Cranky.toml @@ -37,3 +37,8 @@ warn = [ # cargo group "clippy::wildcard_dependencies", ] + +allow = [ + # Produces too many false positives. + "clippy::redundant_locals", +] diff --git a/node/actors/consensus/src/leader/state_machine.rs b/node/actors/consensus/src/leader/state_machine.rs index f2473d92..8a3b081e 100644 --- a/node/actors/consensus/src/leader/state_machine.rs +++ b/node/actors/consensus/src/leader/state_machine.rs @@ -5,7 +5,7 @@ use std::{ collections::{BTreeMap, HashMap}, unreachable, }; -use tracing::{instrument, warn}; +use tracing::instrument; /// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store /// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for @@ -72,8 +72,5 @@ impl StateMachine { }; metrics::METRICS.leader_processing_latency[&label.with_result(&result)] .observe_latency(ctx.now() - now); - if let Err(e) = result { - warn!("{}", e); - } } } diff --git a/node/actors/consensus/src/leader/tests.rs b/node/actors/consensus/src/leader/tests.rs index 19949cfd..51fe66ee 100644 --- a/node/actors/consensus/src/leader/tests.rs +++ b/node/actors/consensus/src/leader/tests.rs @@ -11,7 +11,7 @@ async fn replica_commit() { let keys: Vec<_> = (0..1).map(|_| rng.gen()).collect(); let (genesis, val_set) = testonly::make_genesis(&keys, vec![]); - let (mut consensus, _) = testonly::make_consensus(ctx, &keys[0], &val_set, &genesis); + let (mut consensus, _) = testonly::make_consensus(ctx, &keys[0], &val_set, &genesis).await; let proposal_block_hash = rng.gen(); diff --git a/node/actors/consensus/src/lib.rs b/node/actors/consensus/src/lib.rs index 0ade85e7..0857233e 100644 --- a/node/actors/consensus/src/lib.rs +++ b/node/actors/consensus/src/lib.rs @@ -16,11 +16,12 @@ //! - Blog posts explaining [safety](https://seafooler.com/2022/01/24/understanding-safety-hotstuff/) and [responsiveness](https://seafooler.com/2022/04/02/understanding-responsiveness-hotstuff/) use crate::io::{InputMessage, OutputMessage}; +use anyhow::Context as _; use concurrency::ctx; use inner::ConsensusInner; use roles::validator; use std::sync::Arc; -use storage::Storage; +use storage::ReplicaStateStore; use tracing::{info, instrument}; use utils::pipe::ActorPipe; @@ -48,22 +49,22 @@ pub struct Consensus { impl Consensus { /// Creates a new Consensus struct. #[instrument(level = "trace", ret)] - pub fn new( + pub async fn new( ctx: &ctx::Ctx, pipe: ActorPipe, secret_key: validator::SecretKey, validator_set: validator::ValidatorSet, - storage: Arc, - ) -> Self { - Consensus { + storage: Arc, + ) -> anyhow::Result { + Ok(Consensus { inner: ConsensusInner { pipe, secret_key, validator_set, }, - replica: replica::StateMachine::new(storage), + replica: replica::StateMachine::new(ctx, storage).await?, leader: leader::StateMachine::new(ctx), - } + }) } /// Starts the Consensus actor. It will start running, processing incoming messages and @@ -76,7 +77,9 @@ impl Consensus { ); // We need to start the replica before processing inputs. - self.replica.start(ctx, &self.inner); + self.replica + .start(ctx, &self.inner) + .context("replica.start()")?; // This is the infinite loop where the consensus actually runs. The validator waits for either // a message from the network or for a timeout, and processes each accordingly. @@ -99,18 +102,21 @@ impl Consensus { match &req.msg.msg { validator::ConsensusMsg::ReplicaPrepare(_) | validator::ConsensusMsg::ReplicaCommit(_) => { - self.leader.process_input(ctx, &self.inner, req.msg) + self.leader.process_input(ctx, &self.inner, req.msg); } validator::ConsensusMsg::LeaderPrepare(_) | validator::ConsensusMsg::LeaderCommit(_) => { - self.replica.process_input(ctx, &self.inner, Some(req.msg)) + self.replica + .process_input(ctx, &self.inner, Some(req.msg))?; } } // Notify network actor that the message has been processed. // Ignore sending error. let _ = req.ack.send(()); } - None => self.replica.process_input(ctx, &self.inner, None), + None => { + self.replica.process_input(ctx, &self.inner, None)?; + } } } } diff --git a/node/actors/consensus/src/replica/error.rs b/node/actors/consensus/src/replica/error.rs index 4e16cd2f..5d6c57c6 100644 --- a/node/actors/consensus/src/replica/error.rs +++ b/node/actors/consensus/src/replica/error.rs @@ -65,4 +65,6 @@ pub(crate) enum Error { LeaderPrepareReproposalWhenFinalized, #[error("received leader prepare message with block re-proposal of invalid block")] LeaderPrepareReproposalInvalidBlock, + #[error("failed saving replica state to DB: {_0}")] + ReplicaStateSave(#[source] anyhow::Error), } diff --git a/node/actors/consensus/src/replica/leader_commit.rs b/node/actors/consensus/src/replica/leader_commit.rs index 352117c6..932a8e29 100644 --- a/node/actors/consensus/src/replica/leader_commit.rs +++ b/node/actors/consensus/src/replica/leader_commit.rs @@ -1,6 +1,5 @@ use super::StateMachine; use crate::{inner::ConsensusInner, replica::error::Error}; - use concurrency::ctx; use roles::validator; use tracing::instrument; @@ -8,7 +7,7 @@ use tracing::instrument; impl StateMachine { /// Processes a leader commit message. We can approve this leader message even if we /// don't have the block proposal stored. It is enough to see the justification. - #[instrument(level = "trace", ret)] + #[instrument(level = "trace", err)] pub(crate) fn process_leader_commit( &mut self, ctx: &ctx::Ctx, @@ -66,7 +65,7 @@ impl StateMachine { // Start a new view. But first we skip to the view of this message. self.view = view; - self.start_new_view(ctx, consensus); + self.start_new_view(ctx, consensus)?; Ok(()) } diff --git a/node/actors/consensus/src/replica/leader_prepare.rs b/node/actors/consensus/src/replica/leader_prepare.rs index df479c3f..4ddfa33f 100644 --- a/node/actors/consensus/src/replica/leader_prepare.rs +++ b/node/actors/consensus/src/replica/leader_prepare.rs @@ -206,7 +206,7 @@ impl StateMachine { } // Backup our state. - self.backup_state(); + self.backup_state(ctx).map_err(Error::ReplicaStateSave)?; // Send the replica message to the leader. let output_message = ConsensusInputMessage { diff --git a/node/actors/consensus/src/replica/new_view.rs b/node/actors/consensus/src/replica/new_view.rs index da8ef443..e0f84663 100644 --- a/node/actors/consensus/src/replica/new_view.rs +++ b/node/actors/consensus/src/replica/new_view.rs @@ -1,15 +1,19 @@ -use super::StateMachine; +use super::{error::Error, StateMachine}; use crate::ConsensusInner; use concurrency::ctx; use network::io::{ConsensusInputMessage, Target}; use roles::validator; -use tracing::{info, instrument}; +use tracing::instrument; impl StateMachine { - /// This method is used whenever we start a new view. - #[instrument(level = "trace", ret)] - pub(crate) fn start_new_view(&mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner) { - info!("Starting view {}", self.view.next().0); + /// This blocking method is used whenever we start a new view. + #[instrument(level = "trace", err)] + pub(crate) fn start_new_view( + &mut self, + ctx: &ctx::Ctx, + consensus: &ConsensusInner, + ) -> Result<(), Error> { + tracing::info!("Starting view {}", self.view.next().0); // Update the state machine. let next_view = self.view.next(); @@ -22,7 +26,7 @@ impl StateMachine { .retain(|k, _| k > &self.high_qc.message.proposal_block_number); // Backup our state. - self.backup_state(); + self.backup_state(ctx).map_err(Error::ReplicaStateSave)?; // Send the replica message to the next leader. let output_message = ConsensusInputMessage { @@ -41,5 +45,6 @@ impl StateMachine { // Reset the timer. self.reset_timer(ctx); + Ok(()) } } diff --git a/node/actors/consensus/src/replica/state_machine.rs b/node/actors/consensus/src/replica/state_machine.rs index 3380a65d..c3d373c8 100644 --- a/node/actors/consensus/src/replica/state_machine.rs +++ b/node/actors/consensus/src/replica/state_machine.rs @@ -1,12 +1,12 @@ -use crate::{metrics, ConsensusInner}; -use concurrency::{ctx, metrics::LatencyHistogramExt as _, time}; +use crate::{metrics, replica::error::Error, ConsensusInner}; +use concurrency::{ctx, metrics::LatencyHistogramExt as _, scope, time}; use roles::validator; use std::{ collections::{BTreeMap, HashMap}, sync::Arc, }; -use storage::Storage; -use tracing::{instrument, warn}; +use storage::{ReplicaStateStore, StorageError}; +use tracing::instrument; /// The StateMachine struct contains the state of the replica. This is the most complex state machine and is responsible /// for validating and voting on blocks. When participating in consensus we are always a replica. @@ -26,14 +26,17 @@ pub(crate) struct StateMachine { /// The deadline to receive an input message. pub(crate) timeout_deadline: time::Deadline, /// A reference to the storage module. We use it to backup the replica state. - pub(crate) storage: Arc, + pub(crate) storage: Arc, } impl StateMachine { /// Creates a new StateMachine struct. We try to recover a past state from the storage module, /// otherwise we initialize the state machine with whatever head block we have. - pub(crate) fn new(storage: Arc) -> Self { - match storage.get_replica_state() { + pub(crate) async fn new( + ctx: &ctx::Ctx, + storage: Arc, + ) -> anyhow::Result { + Ok(match storage.replica_state(ctx).await? { Some(backup) => Self { view: backup.view, phase: backup.phase, @@ -44,8 +47,7 @@ impl StateMachine { storage, }, None => { - let head = storage.get_head_block(); - + let head = storage.head_block(ctx).await?; Self { view: head.justification.message.view, phase: validator::Phase::Prepare, @@ -56,18 +58,23 @@ impl StateMachine { storage, } } - } + }) } /// Starts the state machine. The replica state needs to be initialized before /// we are able to process inputs. If we are in the genesis block, then we start a new view, /// this will kick start the consensus algorithm. Otherwise, we just start the timer. #[instrument(level = "trace", ret)] - pub(crate) fn start(&mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner) { + pub(crate) fn start( + &mut self, + ctx: &ctx::Ctx, + consensus: &ConsensusInner, + ) -> Result<(), Error> { if self.view == validator::ViewNumber(0) { self.start_new_view(ctx, consensus) } else { - self.reset_timer(ctx) + self.reset_timer(ctx); + Ok(()) } } @@ -80,12 +87,12 @@ impl StateMachine { ctx: &ctx::Ctx, consensus: &ConsensusInner, input: Option>, - ) { + ) -> anyhow::Result<()> { let Some(signed_msg) = input else { - warn!("We timed out before receiving a message."); + tracing::warn!("We timed out before receiving a message."); // Start new view. - self.start_new_view(ctx, consensus); - return; + self.start_new_view(ctx, consensus)?; + return Ok(()); }; let now = ctx.now(); @@ -102,14 +109,19 @@ impl StateMachine { }; metrics::METRICS.replica_processing_latency[&label.with_result(&result)] .observe_latency(ctx.now() - now); - // All errors from processing inputs are recoverable, so we just log them. - if let Err(e) = result { - warn!("{}", e); + match result { + Ok(()) => Ok(()), + Err(err @ Error::ReplicaStateSave(_)) => Err(err.into()), + Err(err) => { + // Other errors from processing inputs are recoverable, so we just log them. + tracing::warn!("{err}"); + Ok(()) + } } } /// Backups the replica state to disk. - pub(crate) fn backup_state(&self) { + pub(crate) fn backup_state(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let backup = storage::ReplicaState { view: self.view, phase: self.phase, @@ -118,6 +130,16 @@ impl StateMachine { block_proposal_cache: self.block_proposal_cache.clone(), }; - self.storage.put_replica_state(&backup); + let store_result = scope::run_blocking!(ctx, |ctx, s| { + let backup_future = self.storage.put_replica_state(ctx, &backup); + s.spawn(backup_future).join(ctx).block()?; + Ok(()) + }); + match store_result { + Ok(()) => { /* Everything went fine */ } + Err(StorageError::Canceled(_)) => tracing::trace!("Storing replica state was canceled"), + Err(StorageError::Database(err)) => return Err(err), + } + Ok(()) } } diff --git a/node/actors/consensus/src/replica/tests.rs b/node/actors/consensus/src/replica/tests.rs index 829267dd..8789adb7 100644 --- a/node/actors/consensus/src/replica/tests.rs +++ b/node/actors/consensus/src/replica/tests.rs @@ -1,5 +1,5 @@ use crate::testonly; -use concurrency::{ctx, time}; +use concurrency::{ctx, scope, time}; use network::io::{ConsensusInputMessage, Target}; use rand::Rng; use roles::validator::{self, ViewNumber}; @@ -11,13 +11,26 @@ async fn start_new_view_not_leader() { let keys: Vec<_> = (0..4).map(|_| rng.gen()).collect(); let (genesis, val_set) = testonly::make_genesis(&keys, vec![]); - let (mut consensus, mut pipe) = testonly::make_consensus(ctx, &keys[0], &val_set, &genesis); + let (mut consensus, mut pipe) = + testonly::make_consensus(ctx, &keys[0], &val_set, &genesis).await; // TODO: this test assumes a specific implementation of the leader schedule. // Make it leader-schedule agnostic (use epoch to select a specific view). consensus.replica.view = ViewNumber(1); consensus.replica.high_qc = rng.gen(); consensus.replica.high_qc.message.view = ViewNumber(0); - consensus.replica.start_new_view(ctx, &consensus.inner); + + scope::run!(ctx, |ctx, s| { + s.spawn_blocking(|| { + consensus + .replica + .start_new_view(ctx, &consensus.inner) + .unwrap(); + Ok(()) + }) + .join(ctx) + }) + .await + .unwrap(); let test_new_view_msg = ConsensusInputMessage { message: consensus diff --git a/node/actors/consensus/src/testonly/make.rs b/node/actors/consensus/src/testonly/make.rs index f00b786e..00c187c0 100644 --- a/node/actors/consensus/src/testonly/make.rs +++ b/node/actors/consensus/src/testonly/make.rs @@ -7,12 +7,12 @@ use crate::{ use concurrency::ctx; use roles::validator; use std::sync::Arc; -use storage::Storage; +use storage::RocksdbStorage; use tempfile::tempdir; use utils::pipe::{self, DispatcherPipe}; /// This creates a mock Consensus struct for unit tests. -pub fn make_consensus( +pub async fn make_consensus( ctx: &ctx::Ctx, key: &validator::SecretKey, validator_set: &validator::ValidatorSet, @@ -20,12 +20,11 @@ pub fn make_consensus( ) -> (Consensus, DispatcherPipe) { // Create a temporary folder. let temp_dir = tempdir().unwrap(); - let temp_file = temp_dir.path().join("block_store"); - // Initialize the storage. - let storage = Storage::new(genesis_block, &temp_file); - + let storage = RocksdbStorage::new(ctx, genesis_block, &temp_file) + .await + .unwrap(); // Create the pipe. let (consensus_pipe, dispatcher_pipe) = pipe::new(); @@ -36,7 +35,9 @@ pub fn make_consensus( validator_set.clone(), Arc::new(storage), ); - + let consensus = consensus + .await + .expect("Initializing consensus actor failed"); (consensus, dispatcher_pipe) } diff --git a/node/actors/consensus/src/testonly/run.rs b/node/actors/consensus/src/testonly/run.rs index 37c6b975..5e146217 100644 --- a/node/actors/consensus/src/testonly/run.rs +++ b/node/actors/consensus/src/testonly/run.rs @@ -7,7 +7,7 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, }; -use storage::Storage; +use storage::RocksdbStorage; use tracing::Instrument as _; use utils::pipe; @@ -100,16 +100,23 @@ async fn run_nodes( network_pipes.insert(validator_key.public(), network_actor_pipe); s.spawn( async { - let dir = tempfile::tempdir().unwrap(); + let dir = tempfile::tempdir().context("tempdir()")?; let storage = - Arc::new(Storage::new(&genesis_block, &dir.path().join("storage"))); + RocksdbStorage::new(ctx, &genesis_block, &dir.path().join("storage")) + .await + .context("RocksdbStorage")?; + let storage = Arc::new(storage); + let consensus = Consensus::new( ctx, consensus_actor_pipe, n.net.state().cfg().consensus.key.clone(), validator_set, storage, - ); + ) + .await + .context("consensus")?; + scope::run!(ctx, |ctx, s| async { network_ready.recv(ctx).await?; s.spawn_blocking(|| consensus.run(ctx).context("consensus.run()")); diff --git a/node/actors/executor/src/main.rs b/node/actors/executor/src/main.rs index 84a46e9e..9fbff851 100644 --- a/node/actors/executor/src/main.rs +++ b/node/actors/executor/src/main.rs @@ -6,8 +6,9 @@ use concurrency::{ctx, scope, time}; use consensus::Consensus; use executor::{configurator::Configs, io::Dispatcher}; use std::{fs, io::IsTerminal as _, path::Path, sync::Arc}; -use storage::Storage; -use tracing::{debug, info, metadata::LevelFilter}; +use storage::{BlockStore, RocksdbStorage}; +use sync_blocks::SyncBlocks; +use tracing::metadata::LevelFilter; use tracing_subscriber::{prelude::*, Registry}; use utils::{no_copy::NoCopy, pipe}; use vise_exporter::MetricsExporter; @@ -55,29 +56,27 @@ async fn main() -> anyhow::Result<()> { tracing::subscriber::set_global_default(subscriber).unwrap(); // Start the node. - info!("Starting node."); + tracing::info!("Starting node."); } // Load the config files. - debug!("Loading config files."); + tracing::debug!("Loading config files."); let configs = Configs::read(&args).context("configs.read()")?; if config_mode { - info!("Configuration verified."); + tracing::info!("Configuration verified."); return Ok(()); } // Initialize the storage. - debug!("Initializing storage."); + tracing::debug!("Initializing storage."); - let storage = Arc::new(Storage::new( - &configs.config.genesis_block, - Path::new("./database"), - )); + let storage = RocksdbStorage::new(ctx, &configs.config.genesis_block, Path::new("./database")); + let storage = Arc::new(storage.await.context("RocksdbStorage::new()")?); // Generate the communication pipes. We have one for each actor. let (consensus_actor_pipe, consensus_dispatcher_pipe) = pipe::new(); - let (_, sync_blocks_dispatcher_pipe) = pipe::new(); + let (sync_blocks_actor_pipe, sync_blocks_dispatcher_pipe) = pipe::new(); let (network_actor_pipe, network_dispatcher_pipe) = pipe::new(); // Create the IO dispatcher. @@ -95,10 +94,24 @@ async fn main() -> anyhow::Result<()> { configs.validator_key.clone(), validator_set.clone(), storage.clone(), - ); - // FIXME(slowli): Run `sync_blocks` actor once it's fully functional + ) + .await + .context("consensus")?; + + let sync_blocks_config = sync_blocks::Config::new( + validator_set.clone(), + consensus::misc::consensus_threshold(validator_set.len()), + )?; + let sync_blocks = SyncBlocks::new( + ctx, + sync_blocks_actor_pipe, + storage.clone(), + sync_blocks_config, + ) + .await + .context("sync_blocks")?; - debug!("Starting actors in separate threads."); + tracing::debug!("Starting actors in separate threads."); scope::run!(ctx, |ctx, s| async { if let Some(addr) = configs.config.metrics_server_addr { let addr = NoCopy::from(addr); @@ -123,15 +136,16 @@ async fn main() -> anyhow::Result<()> { }); s.spawn_blocking(|| consensus.run(ctx).context("Consensus stopped")); + s.spawn(async { sync_blocks.run(ctx).await.context("Syncing blocks stopped") }); // if we are in CI mode, we wait for the node to finalize 100 blocks and then we stop it if ci_mode { let storage = storage.clone(); - loop { - let block_finalized = storage.get_head_block().block.number.0; + let block_finalized = storage.head_block(ctx).await.context("head_block")?; + let block_finalized = block_finalized.block.number.0; - info!("current finalized block {}", block_finalized); + tracing::info!("current finalized block {}", block_finalized); if block_finalized > 100 { // we wait for 10 seconds to make sure that we send enough messages to other nodes // and other nodes have enough messages to finalize 100+ blocks @@ -141,7 +155,7 @@ async fn main() -> anyhow::Result<()> { ctx.sleep(time::Duration::seconds(1)).await?; } - info!("Cancel all tasks"); + tracing::info!("Cancel all tasks"); s.cancel(); } diff --git a/node/actors/sync_blocks/src/lib.rs b/node/actors/sync_blocks/src/lib.rs index eefd8fab..3c471782 100644 --- a/node/actors/sync_blocks/src/lib.rs +++ b/node/actors/sync_blocks/src/lib.rs @@ -13,7 +13,7 @@ use concurrency::{ }; use network::io::SyncState; use std::sync::Arc; -use storage::Storage; +use storage::WriteBlockStore; use tracing::instrument; use utils::pipe::ActorPipe; @@ -40,23 +40,24 @@ pub struct SyncBlocks { impl SyncBlocks { /// Creates a new actor. - pub fn new( + pub async fn new( + ctx: &ctx::Ctx, pipe: ActorPipe, - storage: Arc, + storage: Arc, config: Config, - ) -> Self { - let (state_sender, _) = watch::channel(Self::get_sync_state(&storage)); + ) -> anyhow::Result { + let (state_sender, _) = watch::channel(Self::get_sync_state(ctx, storage.as_ref()).await?); let (peer_states, peer_states_handle) = PeerStates::new(pipe.send, storage.clone(), config); let inner = SyncBlocksMessageHandler { message_receiver: pipe.recv, storage, peer_states_handle, }; - Self { + Ok(Self { message_handler: inner, peer_states, state_sender, - } + }) } /// Subscribes to `SyncState` updates emitted by the actor. @@ -65,28 +66,27 @@ impl SyncBlocks { } /// Runs the actor processing incoming requests until `ctx` is canceled. - /// - /// **This method is blocking and will run indefinitely.** #[instrument(level = "trace", skip_all, err)] - pub fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let storage = self.message_handler.storage.clone(); - scope::run_blocking!(ctx, |ctx, s| { + scope::run!(ctx, |ctx, s| async { s.spawn_bg(Self::emit_state_updates(ctx, storage, &self.state_sender)); s.spawn_bg(self.peer_states.run(ctx)); - self.message_handler.process_messages(ctx) + self.message_handler.process_messages(ctx).await }) + .await } #[instrument(level = "trace", skip_all, err)] async fn emit_state_updates( ctx: &ctx::Ctx, - storage: Arc, + storage: Arc, state_sender: &watch::Sender, ) -> anyhow::Result<()> { let mut storage_subscriber = storage.subscribe_to_block_writes(); loop { - let state = Self::get_sync_state(&storage); + let state = Self::get_sync_state(ctx, storage.as_ref()).await?; if state_sender.send(state).is_err() { tracing::info!("`SyncState` subscriber dropped; exiting"); return Ok(()); @@ -98,19 +98,21 @@ impl SyncBlocks { } /// Gets the current sync state of this node based on information from the storage. - /// - /// **This method is blocking.** #[instrument(level = "trace", skip_all)] - fn get_sync_state(storage: &Storage) -> SyncState { - let last_contiguous_block_number = storage.get_last_contiguous_block_number(); + async fn get_sync_state( + ctx: &ctx::Ctx, + storage: &dyn WriteBlockStore, + ) -> anyhow::Result { + let last_contiguous_block_number = storage.last_contiguous_block_number(ctx).await?; let last_contiguous_stored_block = storage - .get_block(last_contiguous_block_number) + .block(ctx, last_contiguous_block_number) + .await? .expect("`last_contiguous_stored_block` disappeared"); - SyncState { - first_stored_block: storage.get_first_block().justification, + Ok(SyncState { + first_stored_block: storage.first_block(ctx).await?.justification, last_contiguous_stored_block: last_contiguous_stored_block.justification, - last_stored_block: storage.get_head_block().justification, - } + last_stored_block: storage.head_block(ctx).await?.justification, + }) } } diff --git a/node/actors/sync_blocks/src/message_handler.rs b/node/actors/sync_blocks/src/message_handler.rs index 9c3aa959..e3779c54 100644 --- a/node/actors/sync_blocks/src/message_handler.rs +++ b/node/actors/sync_blocks/src/message_handler.rs @@ -5,7 +5,7 @@ use concurrency::ctx::{self, channel}; use network::io::{GetBlockError, GetBlockResponse, SyncBlocksRequest}; use roles::validator::BlockNumber; use std::sync::Arc; -use storage::Storage; +use storage::WriteBlockStore; use tracing::instrument; /// Inner details of `SyncBlocks` actor allowing to process messages. @@ -14,19 +14,17 @@ pub(crate) struct SyncBlocksMessageHandler { /// Pipe using which the actor sends / receives messages. pub(crate) message_receiver: channel::UnboundedReceiver, /// Persistent storage for blocks. - pub(crate) storage: Arc, + pub(crate) storage: Arc, /// Set of validators authoring blocks. pub(crate) peer_states_handle: PeerStatesHandle, } impl SyncBlocksMessageHandler { /// Implements the message processing loop. - /// - /// **This method is blocking and will run indefinitely.** #[instrument(level = "trace", skip_all, err)] - pub(crate) fn process_messages(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + pub(crate) async fn process_messages(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { loop { - let input_message = self.message_receiver.recv(ctx).block()?; + let input_message = self.message_receiver.recv(ctx).await?; match input_message { InputMessage::Network(SyncBlocksRequest::UpdatePeerSyncState { peer, @@ -40,7 +38,7 @@ impl SyncBlocksMessageHandler { block_number, response, }) => { - response.send(self.get_block(block_number)).ok(); + response.send(self.get_block(ctx, block_number).await?).ok(); } } } @@ -49,10 +47,16 @@ impl SyncBlocksMessageHandler { /// Gets a block with the specified `number` from the storage. /// /// **This method is blocking.** - #[instrument(level = "trace", skip(self), err)] - fn get_block(&self, number: BlockNumber) -> GetBlockResponse { - self.storage - .get_block(number) - .ok_or(GetBlockError::NotSynced) + #[instrument(level = "trace", skip(self, ctx), err)] + async fn get_block( + &self, + ctx: &ctx::Ctx, + number: BlockNumber, + ) -> anyhow::Result { + Ok(self + .storage + .block(ctx, number) + .await? + .ok_or(GetBlockError::NotSynced)) } } diff --git a/node/actors/sync_blocks/src/peers/mod.rs b/node/actors/sync_blocks/src/peers/mod.rs index 102e2a08..44817d6c 100644 --- a/node/actors/sync_blocks/src/peers/mod.rs +++ b/node/actors/sync_blocks/src/peers/mod.rs @@ -14,7 +14,7 @@ use roles::{ validator::{BlockNumber, FinalBlock}, }; use std::{collections::HashMap, sync::Arc}; -use storage::Storage; +use storage::WriteBlockStore; use tracing::instrument; mod events; @@ -49,7 +49,7 @@ pub(crate) struct PeerStates { events_sender: Option>, peers: Mutex>, message_sender: channel::UnboundedSender, - storage: Arc, + storage: Arc, config: Config, } @@ -57,7 +57,7 @@ impl PeerStates { /// Creates a new instance together with a handle. pub(crate) fn new( message_sender: channel::UnboundedSender, - storage: Arc, + storage: Arc, config: Config, ) -> (Self, PeerStatesHandle) { let (updates_sender, updates_receiver) = channel::unbounded(); @@ -80,19 +80,16 @@ impl PeerStates { /// 3. Spawn a task to get each missing block. pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let updates_receiver = self.updates_receiver.take().unwrap(); - let storage = &self.storage; + let storage = self.storage.as_ref(); let get_block_semaphore = Semaphore::new(self.config.max_concurrent_blocks); let (new_blocks_sender, mut new_blocks_subscriber) = watch::channel(BlockNumber(0)); scope::run!(ctx, |ctx, s| async { - // `storage` uses blocking I/O, so we spawn it a blocking context for it - let blocks_task = s.spawn_blocking(|| { - let start_number = storage.get_last_contiguous_block_number(); - let end_number = storage.get_head_block().block.number; - let missing_blocks = storage.get_missing_block_numbers(start_number..end_number); - Ok((end_number, missing_blocks)) - }); - let (mut last_block_number, missing_blocks) = blocks_task.join(ctx).await?; + let start_number = storage.last_contiguous_block_number(ctx).await?; + let mut last_block_number = storage.head_block(ctx).await?.block.number; + let missing_blocks = storage + .missing_block_numbers(ctx, start_number..last_block_number) + .await?; new_blocks_sender.send_replace(last_block_number); s.spawn_bg(self.run_updates(ctx, updates_receiver, new_blocks_sender)); @@ -219,7 +216,7 @@ impl PeerStates { ctx: &ctx::Ctx, block_number: BlockNumber, get_block_permit: sync::SemaphorePermit<'_>, - storage: &Storage, + storage: &dyn WriteBlockStore, ) -> anyhow::Result<()> { let block = self.get_block(ctx, block_number).await?; drop(get_block_permit); @@ -227,18 +224,7 @@ impl PeerStates { if let Some(events_sender) = &self.events_sender { events_sender.send(PeerStateEvent::GotBlock(block_number)); } - - scope::run!(ctx, |ctx, s| async { - s.spawn_blocking(|| { - let block = block; - storage.put_block(&block); - Ok(()) - }) - .join(ctx) - .await - }) - .await?; - + storage.put_block(ctx, &block).await?; Ok(()) } diff --git a/node/actors/sync_blocks/src/peers/tests.rs b/node/actors/sync_blocks/src/peers/tests.rs index 3820ad8d..5175a5c6 100644 --- a/node/actors/sync_blocks/src/peers/tests.rs +++ b/node/actors/sync_blocks/src/peers/tests.rs @@ -5,6 +5,7 @@ use async_trait::async_trait; use concurrency::time; use rand::{rngs::StdRng, seq::IteratorRandom, Rng}; use std::{collections::HashSet, fmt}; +use storage::RocksdbStorage; use test_casing::{test_casing, Product}; const TEST_TIMEOUT: time::Duration = time::Duration::seconds(5); @@ -16,20 +17,25 @@ struct TestHandles { rng: StdRng, test_validators: TestValidators, peer_states_handle: PeerStatesHandle, - storage: Arc, + storage: Arc, message_receiver: channel::UnboundedReceiver, events_receiver: channel::UnboundedReceiver, } #[async_trait] -trait Test: fmt::Debug { +trait Test: fmt::Debug + Send + Sync { const BLOCK_COUNT: usize; fn tweak_config(&self, _config: &mut Config) { // Does nothing by default } - fn initialize_storage(&self, _storage: &Storage, _test_validators: &TestValidators) { + async fn initialize_storage( + &self, + _ctx: &ctx::Ctx, + _storage: &dyn WriteBlockStore, + _test_validators: &TestValidators, + ) { // Does nothing by default } @@ -39,16 +45,16 @@ trait Test: fmt::Debug { #[instrument(level = "trace", skip(ctx, storage), err)] async fn wait_for_stored_block( ctx: &ctx::Ctx, - storage: &Storage, + storage: &dyn WriteBlockStore, expected_block_number: BlockNumber, ) -> ctx::OrCanceled<()> { tracing::trace!("Started waiting for stored block"); let mut subscriber = storage.subscribe_to_block_writes(); - let mut got_block = storage.get_last_contiguous_block_number(); + let mut got_block = storage.last_contiguous_block_number(ctx).await.unwrap(); while got_block < expected_block_number { sync::changed(ctx, &mut subscriber).await?; - got_block = storage.get_last_contiguous_block_number() + got_block = storage.last_contiguous_block_number(ctx).await.unwrap(); } Ok(()) } @@ -64,11 +70,10 @@ async fn test_peer_states(test: T) { let ctx = &ctx::test_with_clock(ctx, &clock); let mut rng = ctx.rng(); let test_validators = TestValidators::new(4, T::BLOCK_COUNT, &mut rng); - let storage = Arc::new(Storage::new( - &test_validators.final_blocks[0], - storage_dir.path(), - )); - test.initialize_storage(&storage, &test_validators); + let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir.path()); + let storage = Arc::new(storage.await.unwrap()); + test.initialize_storage(ctx, storage.as_ref(), &test_validators) + .await; let (message_sender, message_receiver) = channel::unbounded(); let (events_sender, events_receiver) = channel::unbounded(); @@ -90,7 +95,7 @@ async fn test_peer_states(test: T) { scope::run!(ctx, |ctx, s| async { s.spawn_bg(async { peer_states.run(ctx).await.or_else(|err| { - if err.is::() { + if err.root_cause().is::() { Ok(()) // Swallow cancellation errors after the test is finished } else { Err(err) @@ -226,7 +231,7 @@ impl Test for UpdatingPeerStateWithMultipleBlocks { } let expected_block_number = BlockNumber(Self::BLOCK_COUNT as u64 - 1); - wait_for_stored_block(ctx, &storage, expected_block_number).await?; + wait_for_stored_block(ctx, storage.as_ref(), expected_block_number).await?; Ok(()) } } @@ -265,9 +270,17 @@ impl Test for DownloadingBlocksInGaps { config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; } - fn initialize_storage(&self, storage: &Storage, test_validators: &TestValidators) { + async fn initialize_storage( + &self, + ctx: &ctx::Ctx, + storage: &dyn WriteBlockStore, + test_validators: &TestValidators, + ) { for &block_number in &self.local_block_numbers { - storage.put_block(&test_validators.final_blocks[block_number]); + storage + .put_block(ctx, &test_validators.final_blocks[block_number]) + .await + .unwrap(); } } @@ -316,7 +329,7 @@ impl Test for DownloadingBlocksInGaps { assert_eq!(recipient, peer_key); assert_eq!(number.0 as usize, expected_number); test_validators.send_block(number, response); - wait_for_stored_block(ctx, &storage, number).await?; + wait_for_stored_block(ctx, storage.as_ref(), number).await?; clock.advance(BLOCK_SLEEP_INTERVAL); } Ok(()) @@ -488,7 +501,7 @@ impl Test for RequestingBlocksFromTwoPeers { clock.advance(BLOCK_SLEEP_INTERVAL); assert!(message_receiver.try_recv().is_none()); - wait_for_stored_block(ctx, &storage, BlockNumber(4)).await?; + wait_for_stored_block(ctx, storage.as_ref(), BlockNumber(4)).await?; Ok(()) } } @@ -650,7 +663,7 @@ impl Test for RequestingBlocksFromMultiplePeers { } } - wait_for_stored_block(ctx, &storage, BlockNumber(19)).await?; + wait_for_stored_block(ctx, storage.as_ref(), BlockNumber(19)).await?; Ok(()) }) .await @@ -762,7 +775,7 @@ impl Test for DisconnectingPeer { clock.advance(BLOCK_SLEEP_INTERVAL); assert!(message_receiver.try_recv().is_none()); - wait_for_stored_block(ctx, &storage, BlockNumber(2)).await?; + wait_for_stored_block(ctx, storage.as_ref(), BlockNumber(2)).await?; Ok(()) } } @@ -798,16 +811,14 @@ async fn requesting_blocks_with_unreliable_peers( test_peer_states(test).await; } -#[test] -fn processing_invalid_sync_states() { +#[tokio::test] +async fn processing_invalid_sync_states() { let storage_dir = tempfile::tempdir().unwrap(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); let test_validators = TestValidators::new(4, 3, rng); - let storage = Arc::new(Storage::new( - &test_validators.final_blocks[0], - storage_dir.path(), - )); + let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir.path()); + let storage = Arc::new(storage.await.unwrap()); let (message_sender, _) = channel::unbounded(); let (peer_states, _) = PeerStates::new(message_sender, storage, test_validators.test_config()); @@ -859,16 +870,14 @@ fn processing_invalid_sync_states() { ); } -#[test] -fn processing_invalid_blocks() { +#[tokio::test] +async fn processing_invalid_blocks() { let storage_dir = tempfile::tempdir().unwrap(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); let test_validators = TestValidators::new(4, 3, rng); - let storage = Arc::new(Storage::new( - &test_validators.final_blocks[0], - storage_dir.path(), - )); + let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir.path()); + let storage = Arc::new(storage.await.unwrap()); let (message_sender, _) = channel::unbounded(); let (peer_states, _) = PeerStates::new(message_sender, storage, test_validators.test_config()); diff --git a/node/actors/sync_blocks/src/tests/end_to_end.rs b/node/actors/sync_blocks/src/tests/end_to_end.rs index 699c4cd9..99b22fdc 100644 --- a/node/actors/sync_blocks/src/tests/end_to_end.rs +++ b/node/actors/sync_blocks/src/tests/end_to_end.rs @@ -8,6 +8,7 @@ use network::testonly::Instance as NetworkInstance; use rand::seq::SliceRandom; use roles::node; use std::{fmt, path::Path}; +use storage::RocksdbStorage; use test_casing::test_casing; use tracing::Instrument; @@ -108,11 +109,19 @@ impl Node { let key = self.key(); let (sync_blocks_actor_pipe, sync_blocks_dispatcher_pipe) = pipe::new(); let (network_actor_pipe, network_dispatcher_pipe) = pipe::new(); - let storage = Arc::new(Storage::new(&test_validators.final_blocks[0], storage_dir)); + let storage = RocksdbStorage::new(ctx, &test_validators.final_blocks[0], storage_dir); + let storage = Arc::new(storage.await.unwrap()); let sync_blocks_config = test_validators.test_config(); - let sync_blocks = - SyncBlocks::new(sync_blocks_actor_pipe, storage.clone(), sync_blocks_config); + let sync_blocks = SyncBlocks::new( + ctx, + sync_blocks_actor_pipe, + storage.clone(), + sync_blocks_config, + ) + .await + .expect("Failed"); + let sync_states_subscriber = sync_blocks.subscribe_to_state_updates(); self.network .set_sync_state_subscriber(sync_states_subscriber.clone()); @@ -122,7 +131,7 @@ impl Node { while let Ok(block_number) = self.create_block_receiver.recv(ctx).await { tracing::trace!(?key, %block_number, "Storing new block"); let block = &test_validators.final_blocks[block_number.0 as usize]; - storage.put_block(block); + storage.put_block(ctx, block).await.unwrap(); } Ok(()) }); @@ -149,7 +158,7 @@ impl Node { .await .with_context(|| format!("executor for {key:?}")) }); - s.spawn_bg_blocking(|| sync_blocks.run(ctx)); + s.spawn_bg(sync_blocks.run(ctx)); tracing::trace!("Node is fully started"); self.switch_off_receiver @@ -245,7 +254,7 @@ async fn test_sync_blocks(test: T) { .unwrap_err(); tracing::trace!(?key, "Node task completed"); - if err.is::() { + if err.root_cause().is::() { Ok(()) // Test has successfully completed } else { Err(err) diff --git a/node/actors/sync_blocks/src/tests/mod.rs b/node/actors/sync_blocks/src/tests/mod.rs index 2d6fa3db..35061374 100644 --- a/node/actors/sync_blocks/src/tests/mod.rs +++ b/node/actors/sync_blocks/src/tests/mod.rs @@ -9,6 +9,7 @@ use rand::{ }; use roles::validator::{self, Block, BlockNumber, CommitQC, FinalBlock, ValidatorSet}; use std::iter; +use storage::RocksdbStorage; use utils::pipe; mod end_to_end; @@ -111,16 +112,18 @@ async fn subscribing_to_state_updates() { let block_2 = create_block(&block_1, rng); let block_3 = create_block(&block_2, rng); - let storage = Storage::new(&genesis_block, storage_dir.path()); - let storage = &Arc::new(storage); + let storage = RocksdbStorage::new(ctx, &genesis_block, storage_dir.path()).await; + let storage = &Arc::new(storage.unwrap()); let (actor_pipe, _dispatcher_pipe) = pipe::new(); - let actor = SyncBlocks::new(actor_pipe, storage.clone(), rng.gen()); + let actor = SyncBlocks::new(ctx, actor_pipe, storage.clone(), rng.gen()) + .await + .unwrap(); let mut state_subscriber = actor.subscribe_to_state_updates(); scope::run!(ctx, |ctx, s| async { - s.spawn_bg_blocking(|| { - actor.run(ctx).or_else(|err| { - if err.is::() { + s.spawn_bg(async { + actor.run(ctx).await.or_else(|err| { + if err.root_cause().is::() { Ok(()) // Swallow cancellation errors after the test is finished } else { Err(err) @@ -144,7 +147,7 @@ async fn subscribing_to_state_updates() { assert_eq!(initial_state.last_stored_block, genesis_block.justification); drop(initial_state); - storage.put_block(&block_1); + storage.put_block(ctx, &block_1).await.unwrap(); let new_state = sync::changed(ctx, &mut state_subscriber).await?; assert_eq!(new_state.first_stored_block, genesis_block.justification); @@ -155,7 +158,7 @@ async fn subscribing_to_state_updates() { assert_eq!(new_state.last_stored_block, block_1.justification); drop(new_state); - storage.put_block(&block_3); + storage.put_block(ctx, &block_3).await.unwrap(); let new_state = sync::changed(ctx, &mut state_subscriber).await?; assert_eq!(new_state.first_stored_block, genesis_block.justification); @@ -200,23 +203,25 @@ async fn getting_blocks() { let rng = &mut ctx.rng(); let genesis_block = create_block_from_base(Block::genesis(vec![]), rng); - let storage = Storage::new(&genesis_block, storage_dir.path()); - let storage = Arc::new(storage); + let storage = RocksdbStorage::new(ctx, &genesis_block, storage_dir.path()); + let storage = Arc::new(storage.await.unwrap()); let blocks = iter::successors(Some(genesis_block), |parent| { Some(create_block(parent, rng)) }); let blocks: Vec<_> = blocks.take(5).collect(); for block in &blocks { - storage.put_block(block); + storage.put_block(ctx, block).await.unwrap(); } let (actor_pipe, dispatcher_pipe) = pipe::new(); - let actor = SyncBlocks::new(actor_pipe, storage.clone(), rng.gen()); + let actor = SyncBlocks::new(ctx, actor_pipe, storage.clone(), rng.gen()) + .await + .unwrap(); scope::run!(ctx, |ctx, s| async { - s.spawn_bg_blocking(|| { - actor.run(ctx).or_else(|err| { - if err.is::() { + s.spawn_bg(async { + actor.run(ctx).await.or_else(|err| { + if err.root_cause().is::() { Ok(()) // Swallow cancellation errors after the test is finished } else { Err(err) diff --git a/node/libs/concurrency/src/scope/mod.rs b/node/libs/concurrency/src/scope/mod.rs index c883e211..312e2bb7 100644 --- a/node/libs/concurrency/src/scope/mod.rs +++ b/node/libs/concurrency/src/scope/mod.rs @@ -310,3 +310,18 @@ impl<'env, E: 'static + Send> Scope<'env, E> { } } } + +/// Spawns the provided blocking closure `f` and waits until it completes or the context gets canceled. +pub async fn wait_blocking<'a, R, E>( + ctx: &'a ctx::Ctx, + f: impl FnOnce() -> Result + Send + 'a, +) -> Result +where + R: 'static + Send, + E: 'static + From + Send, +{ + run!(ctx, |ctx, s| async { + Ok(s.spawn_blocking(f).join(ctx).await?) + }) + .await +} diff --git a/node/libs/storage/Cargo.toml b/node/libs/storage/Cargo.toml index 7e1faca3..52edcaa0 100644 --- a/node/libs/storage/Cargo.toml +++ b/node/libs/storage/Cargo.toml @@ -8,8 +8,10 @@ license.workspace = true [dependencies] anyhow.workspace = true +async-trait.workspace = true rand.workspace = true rocksdb.workspace = true +thiserror.workspace = true tracing.workspace = true concurrency = { path = "../concurrency" } @@ -17,4 +19,7 @@ roles = { path = "../roles" } schema = { path = "../schema" } [dev-dependencies] +assert_matches.workspace = true tempfile.workspace = true +test-casing.workspace = true +tokio.workspace = true diff --git a/node/libs/storage/src/block_store.rs b/node/libs/storage/src/block_store.rs deleted file mode 100644 index b0179d6d..00000000 --- a/node/libs/storage/src/block_store.rs +++ /dev/null @@ -1,178 +0,0 @@ -//! This module contains the methods to handle an append-only database of finalized blocks. Since we only store finalized blocks, this forms a -//! chain of blocks, not a tree (assuming we have all blocks and not have any gap). It allows for basic functionality like inserting a block, -//! getting a block, checking if a block is contained in the DB. We also store the head of the chain. Storing it explicitly allows us to fetch -//! the current head quickly. - -use crate::{types::DatabaseKey, Storage}; -use rocksdb::{IteratorMode, ReadOptions}; -use roles::validator::{BlockNumber, FinalBlock}; -use std::{iter, ops, sync::atomic::Ordering}; - -impl Storage { - // ---------------- Read methods ---------------- - - /// Gets the head block. - pub fn get_head_block(&self) -> FinalBlock { - let db = self.read(); - - let mut options = ReadOptions::default(); - options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..); - let mut iter = db.iterator_opt(DatabaseKey::BLOCK_HEAD_ITERATOR, options); - let (_, head_block) = iter - .next() - .expect("Head block not found") - .expect("RocksDB error reading head block"); - schema::decode(&head_block).expect("Failed decoding head block bytes") - } - - /// Returns a block with the least number stored in this database. - pub fn get_first_block(&self) -> FinalBlock { - let db = self.read(); - - let mut options = ReadOptions::default(); - options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..); - let mut iter = db.iterator_opt(IteratorMode::Start, options); - let (_, first_block) = iter - .next() - .expect("First stored block not found") - .expect("RocksDB error reading first stored block"); - schema::decode(&first_block).expect("Failed decoding first stored block bytes") - } - - /// Returns the number of the last block in the first contiguous range of blocks stored in this DB. - /// If there are no missing blocks, this is equal to the number of [`Self::get_head_block()`], - /// if there *are* missing blocks, the returned number will be lower. - pub fn get_last_contiguous_block_number(&self) -> BlockNumber { - let last_contiguous_block_number = self - .cached_last_contiguous_block_number - .load(Ordering::Relaxed); - let last_contiguous_block_number = BlockNumber(last_contiguous_block_number); - - let last_contiguous_block_number = - self.last_contiguous_block_number_impl(last_contiguous_block_number); - - // The cached value may have been updated by the other thread. Fortunately, we have a simple - // protection against such "edit conflicts": the greater cached value is always valid and - // should win. - self.cached_last_contiguous_block_number - .fetch_max(last_contiguous_block_number.0, Ordering::Relaxed); - last_contiguous_block_number - } - - // Implementation that is not aware of caching specifics. The only requirement for the method correctness - // is for the `cached_last_contiguous_block_number` to be present in the database. - fn last_contiguous_block_number_impl( - &self, - cached_last_contiguous_block_number: BlockNumber, - ) -> BlockNumber { - let db = self.read(); - - let mut options = ReadOptions::default(); - let start_key = DatabaseKey::Block(cached_last_contiguous_block_number).encode_key(); - options.set_iterate_range(start_key..); - let iter = db.iterator_opt(IteratorMode::Start, options); - let iter = iter - .map(|bytes| { - let (key, _) = bytes.expect("RocksDB error iterating over block numbers"); - DatabaseKey::parse_block_key(&key) - }) - .fuse(); - - let mut prev_block_number = cached_last_contiguous_block_number; - for block_number in iter { - if block_number > prev_block_number.next() { - return prev_block_number; - } - prev_block_number = block_number; - } - prev_block_number - } - - /// Gets a block by its number. - pub fn get_block(&self, number: BlockNumber) -> Option { - let db = self.read(); - - let raw_block = db - .get(DatabaseKey::Block(number).encode_key()) - .unwrap_or_else(|err| panic!("RocksDB error reading block #{number}: {err}"))?; - Some(schema::decode(&raw_block).unwrap_or_else(|err| { - panic!("Failed decoding block #{number}: {err}"); - })) - } - - /// Iterates over block numbers in the specified `range` that the DB *does not* have. - // TODO(slowli): We might want to limit the length of the vec returned - pub fn get_missing_block_numbers(&self, range: ops::Range) -> Vec { - let db = self.read(); - - let mut options = ReadOptions::default(); - let start_key = DatabaseKey::Block(range.start).encode_key(); - let end_key = DatabaseKey::Block(range.end).encode_key(); - options.set_iterate_range(start_key..end_key); - - let iter = db.iterator_opt(IteratorMode::Start, options); - let iter = iter - .map(|bytes| { - let (key, _) = bytes.expect("RocksDB error iterating over block numbers"); - DatabaseKey::parse_block_key(&key) - }) - .fuse(); - - MissingBlockNumbers { - range, - existing_numbers: iter.peekable(), - } - .collect() - } - - // ---------------- Write methods ---------------- - - /// Insert a new block into the database. - pub fn put_block(&self, finalized_block: &FinalBlock) { - let db = self.write(); - - let block_number = finalized_block.block.number; - tracing::debug!("Inserting new block #{block_number} into the database."); - - let mut write_batch = rocksdb::WriteBatch::default(); - write_batch.put( - DatabaseKey::Block(block_number).encode_key(), - schema::encode(finalized_block), - ); - - // Commit the transaction. - db.write(write_batch).unwrap(); - drop(db); - - self.block_writes_sender.send_replace(block_number); - } -} - -struct MissingBlockNumbers { - range: ops::Range, - existing_numbers: iter::Peekable, -} - -impl Iterator for MissingBlockNumbers -where - I: Iterator, -{ - type Item = BlockNumber; - - fn next(&mut self) -> Option { - // Loop while existing numbers match the starting numbers from the range. The check - // that the range is non-empty is redundant given how `existing_numbers` are constructed - // (they are guaranteed to be lesser than the upper range bound); we add it just to be safe. - while !self.range.is_empty() && self.existing_numbers.peek() == Some(&self.range.start) { - self.range.start = self.range.start.next(); - self.existing_numbers.next(); // Advance to the next number - } - - if self.range.is_empty() { - return None; - } - let next_number = self.range.start; - self.range.start = self.range.start.next(); - Some(next_number) - } -} diff --git a/node/libs/storage/src/lib.rs b/node/libs/storage/src/lib.rs index e3637e81..cfcb4247 100644 --- a/node/libs/storage/src/lib.rs +++ b/node/libs/storage/src/lib.rs @@ -1,95 +1,15 @@ //! This module is responsible for persistent data storage, it provides schema-aware type-safe database access. Currently we use RocksDB, //! but this crate only exposes an abstraction of a database, so we can easily switch to a different storage engine in the future. -use concurrency::sync::watch; -use roles::validator::{self, BlockNumber}; -use std::{ - fmt, ops, - path::Path, - sync::{atomic::AtomicU64, RwLock}, -}; - -mod block_store; -mod replica; +mod rocksdb; mod testonly; #[cfg(test)] mod tests; +mod traits; mod types; -pub use types::ReplicaState; - -/// Main struct for the Storage module, it just contains the database. Provides a set of high-level -/// atomic operations on the database. It "contains" the following data: -/// -/// - An append-only database of finalized blocks. -/// - A backup of the consensus replica state. -pub struct Storage { - /// Wrapped RocksDB instance. We don't need `RwLock` for synchronization *per se*, just to ensure - /// that writes to the DB are linearized. - inner: RwLock, - /// In-memory cache for the last contiguous block number stored in the DB. The cache is used - /// and updated by `Self::get_last_contiguous_block_number()`. Caching is based on the assumption - /// that blocks are never removed from the DB. - cached_last_contiguous_block_number: AtomicU64, - /// Sender of numbers of written blocks. - block_writes_sender: watch::Sender, -} - -impl Storage { - /// Create a new Storage. It first tries to open an existing database, and if that fails it just creates a - /// a new one. We need the genesis block of the chain as input. - // TODO(bruno): we want to eventually start pruning old blocks, so having the genesis - // block might be unnecessary. - pub fn new(genesis_block: &validator::FinalBlock, path: &Path) -> Self { - let mut options = rocksdb::Options::default(); - options.create_missing_column_families(true); - options.create_if_missing(true); - - let db = rocksdb::DB::open(&options, path).expect("Failed opening RocksDB"); - let this = Self { - inner: RwLock::new(db), - cached_last_contiguous_block_number: AtomicU64::new(0), - block_writes_sender: watch::channel(genesis_block.block.number).0, - }; - if let Some(stored_genesis_block) = this.get_block(genesis_block.block.number) { - assert_eq!( - stored_genesis_block.block, genesis_block.block, - "Mismatch between stored and expected genesis block" - ); - } else { - tracing::debug!( - "Genesis block not present in RocksDB at `{path}`; saving {genesis_block:?}", - path = path.display() - ); - this.put_block(genesis_block); - } - - this - } - - /// Subscribes to block write operations performed using this `Storage`. Note that since - /// updates are passed using a `watch` channel, only the latest written [`BlockNumber`] - /// will be available; intermediate updates may be dropped. - /// - /// If no blocks were written during the `Storage` lifetime, the channel contains the number - /// of the genesis block. - pub fn subscribe_to_block_writes(&self) -> watch::Receiver { - self.block_writes_sender.subscribe() - } - - /// Acquires a read lock on the underlying DB. - fn read(&self) -> impl ops::Deref + '_ { - self.inner.read().expect("DB lock is poisoned") - } - - /// Acquires a write lock on the underlying DB. - fn write(&self) -> impl ops::Deref + '_ { - self.inner.write().expect("DB lock is poisoned") - } -} - -impl fmt::Debug for Storage { - fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - formatter.write_str("Storage") - } -} +pub use crate::{ + rocksdb::RocksdbStorage, + traits::{BlockStore, ReplicaStateStore, WriteBlockStore}, + types::{ReplicaState, StorageError, StorageResult}, +}; diff --git a/node/libs/storage/src/replica.rs b/node/libs/storage/src/replica.rs deleted file mode 100644 index 8ca7edd5..00000000 --- a/node/libs/storage/src/replica.rs +++ /dev/null @@ -1,30 +0,0 @@ -//! This is module is used to store the replica state. The main purpose of this is to act as a backup in case the node crashes. - -use crate::{ - types::{DatabaseKey, ReplicaState}, - Storage, -}; - -impl Storage { - // ---------------- Read methods ---------------- - - /// Gets the replica state, if it is contained in the database. - pub fn get_replica_state(&self) -> Option { - self.read() - .get(DatabaseKey::ReplicaState.encode_key()) - .unwrap() - .map(|b| schema::decode(&b).expect("Failed to decode replica state!")) - } - - // ---------------- Write methods ---------------- - - /// Store the given replica state into the database. - pub fn put_replica_state(&self, replica_state: &ReplicaState) { - self.write() - .put( - DatabaseKey::ReplicaState.encode_key(), - schema::encode(replica_state), - ) - .unwrap(); - } -} diff --git a/node/libs/storage/src/rocksdb.rs b/node/libs/storage/src/rocksdb.rs new file mode 100644 index 00000000..cb62418f --- /dev/null +++ b/node/libs/storage/src/rocksdb.rs @@ -0,0 +1,340 @@ +//! This module contains the methods to handle an append-only database of finalized blocks. Since we only store finalized blocks, this forms a +//! chain of blocks, not a tree (assuming we have all blocks and not have any gap). It allows for basic functionality like inserting a block, +//! getting a block, checking if a block is contained in the DB. We also store the head of the chain. Storing it explicitly allows us to fetch +//! the current head quickly. + +use crate::{ + traits::{BlockStore, ReplicaStateStore, WriteBlockStore}, + types::{DatabaseKey, MissingBlockNumbers, ReplicaState}, + StorageError, StorageResult, +}; +use anyhow::Context as _; +use async_trait::async_trait; +use concurrency::{ctx, scope, sync::watch}; +use rocksdb::{IteratorMode, ReadOptions}; +use roles::validator::{BlockNumber, FinalBlock}; +use std::{ + fmt, ops, + path::Path, + sync::{ + atomic::{AtomicU64, Ordering}, + RwLock, + }, +}; + +/// Main struct for the Storage module, it just contains the database. Provides a set of high-level +/// atomic operations on the database. It "contains" the following data: +/// +/// - An append-only database of finalized blocks. +/// - A backup of the consensus replica state. +pub struct RocksdbStorage { + /// Wrapped RocksDB instance. We don't need `RwLock` for synchronization *per se*, just to ensure + /// that writes to the DB are linearized. + inner: RwLock, + /// In-memory cache for the last contiguous block number stored in the DB. The cache is used + /// and updated by `Self::get_last_contiguous_block_number()`. Caching is based on the assumption + /// that blocks are never removed from the DB. + cached_last_contiguous_block_number: AtomicU64, + /// Sender of numbers of written blocks. + block_writes_sender: watch::Sender, +} + +impl RocksdbStorage { + /// Create a new Storage. It first tries to open an existing database, and if that fails it just creates a + /// a new one. We need the genesis block of the chain as input. + // TODO(bruno): we want to eventually start pruning old blocks, so having the genesis + // block might be unnecessary. + pub async fn new( + ctx: &ctx::Ctx, + genesis_block: &FinalBlock, + path: &Path, + ) -> StorageResult { + let mut options = rocksdb::Options::default(); + options.create_missing_column_families(true); + options.create_if_missing(true); + + let db = scope::wait_blocking(ctx, || { + rocksdb::DB::open(&options, path) + .context("Failed opening RocksDB") + .map_err(StorageError::Database) + }) + .await?; + + let this = Self { + inner: RwLock::new(db), + cached_last_contiguous_block_number: AtomicU64::new(0), + block_writes_sender: watch::channel(genesis_block.block.number).0, + }; + if let Some(stored_genesis_block) = this.block(ctx, genesis_block.block.number).await? { + if stored_genesis_block.block != genesis_block.block { + let err = anyhow::anyhow!("Mismatch between stored and expected genesis block"); + return Err(StorageError::Database(err)); + } + } else { + tracing::debug!( + "Genesis block not present in RocksDB at `{path}`; saving {genesis_block:?}", + path = path.display() + ); + this.put_block(ctx, genesis_block).await?; + } + Ok(this) + } + + /// Acquires a read lock on the underlying DB. + fn read(&self) -> impl ops::Deref + '_ { + self.inner.read().expect("DB lock is poisoned") + } + + /// Acquires a write lock on the underlying DB. + fn write(&self) -> impl ops::Deref + '_ { + self.inner.write().expect("DB lock is poisoned") + } + + fn head_block_blocking(&self) -> anyhow::Result { + let db = self.read(); + + let mut options = ReadOptions::default(); + options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..); + let mut iter = db.iterator_opt(DatabaseKey::BLOCK_HEAD_ITERATOR, options); + let (_, head_block) = iter + .next() + .context("Head block not found")? + .context("RocksDB error reading head block")?; + schema::decode(&head_block).context("Failed decoding head block bytes") + } + + /// Returns a block with the least number stored in this database. + fn first_block_blocking(&self) -> anyhow::Result { + let db = self.read(); + + let mut options = ReadOptions::default(); + options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..); + let mut iter = db.iterator_opt(IteratorMode::Start, options); + let (_, first_block) = iter + .next() + .context("First stored block not found")? + .context("RocksDB error reading first stored block")?; + schema::decode(&first_block).context("Failed decoding first stored block bytes") + } + + fn last_contiguous_block_number_blocking(&self) -> anyhow::Result { + let last_contiguous_block_number = self + .cached_last_contiguous_block_number + .load(Ordering::Relaxed); + let last_contiguous_block_number = BlockNumber(last_contiguous_block_number); + + let last_contiguous_block_number = + self.last_contiguous_block_number_impl(last_contiguous_block_number)?; + + // The cached value may have been updated by the other thread. Fortunately, we have a simple + // protection against such "edit conflicts": the greater cached value is always valid and + // should win. + self.cached_last_contiguous_block_number + .fetch_max(last_contiguous_block_number.0, Ordering::Relaxed); + Ok(last_contiguous_block_number) + } + + // Implementation that is not aware of caching specifics. The only requirement for the method correctness + // is for the `cached_last_contiguous_block_number` to be present in the database. + fn last_contiguous_block_number_impl( + &self, + cached_last_contiguous_block_number: BlockNumber, + ) -> anyhow::Result { + let db = self.read(); + + let mut options = ReadOptions::default(); + let start_key = DatabaseKey::Block(cached_last_contiguous_block_number).encode_key(); + options.set_iterate_range(start_key..); + let iter = db.iterator_opt(IteratorMode::Start, options); + let iter = iter + .map(|bytes| { + let (key, _) = bytes.context("RocksDB error iterating over block numbers")?; + DatabaseKey::parse_block_key(&key) + }) + .fuse(); + + let mut prev_block_number = cached_last_contiguous_block_number; + for block_number in iter { + let block_number = block_number?; + if block_number > prev_block_number.next() { + return Ok(prev_block_number); + } + prev_block_number = block_number; + } + Ok(prev_block_number) + } + + /// Gets a block by its number. + fn block_blocking(&self, number: BlockNumber) -> anyhow::Result> { + let db = self.read(); + + let Some(raw_block) = db + .get(DatabaseKey::Block(number).encode_key()) + .with_context(|| format!("RocksDB error reading block #{number}"))? + else { + return Ok(None); + }; + let block = schema::decode(&raw_block) + .with_context(|| format!("Failed decoding block #{number}"))?; + Ok(Some(block)) + } + + /// Iterates over block numbers in the specified `range` that the DB *does not* have. + fn missing_block_numbers_blocking( + &self, + range: ops::Range, + ) -> anyhow::Result> { + let db = self.read(); + + let mut options = ReadOptions::default(); + let start_key = DatabaseKey::Block(range.start).encode_key(); + let end_key = DatabaseKey::Block(range.end).encode_key(); + options.set_iterate_range(start_key..end_key); + + let iter = db.iterator_opt(IteratorMode::Start, options); + let iter = iter + .map(|bytes| { + let (key, _) = bytes.context("RocksDB error iterating over block numbers")?; + DatabaseKey::parse_block_key(&key) + }) + .fuse(); + + MissingBlockNumbers::new(range, iter).collect() + } + + // ---------------- Write methods ---------------- + + /// Insert a new block into the database. + fn put_block_blocking(&self, finalized_block: &FinalBlock) -> anyhow::Result<()> { + let db = self.write(); + let block_number = finalized_block.block.number; + tracing::debug!("Inserting new block #{block_number} into the database."); + + let mut write_batch = rocksdb::WriteBatch::default(); + write_batch.put( + DatabaseKey::Block(block_number).encode_key(), + schema::encode(finalized_block), + ); + // Commit the transaction. + db.write(write_batch) + .context("Failed writing block to database")?; + drop(db); + + self.block_writes_sender.send_replace(block_number); + Ok(()) + } + + fn replica_state_blocking(&self) -> anyhow::Result> { + let Some(raw_state) = self + .read() + .get(DatabaseKey::ReplicaState.encode_key()) + .context("Failed to get ReplicaState from RocksDB")? + else { + return Ok(None); + }; + schema::decode(&raw_state) + .map(Some) + .context("Failed to decode replica state!") + } + + fn put_replica_state_blocking(&self, replica_state: &ReplicaState) -> anyhow::Result<()> { + self.write() + .put( + DatabaseKey::ReplicaState.encode_key(), + schema::encode(replica_state), + ) + .context("Failed putting ReplicaState to RocksDB") + } +} + +impl fmt::Debug for RocksdbStorage { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str("RocksdbStorage") + } +} + +#[async_trait] +impl BlockStore for RocksdbStorage { + async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult { + scope::wait_blocking(ctx, || { + self.head_block_blocking().map_err(StorageError::Database) + }) + .await + } + + async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult { + scope::wait_blocking(ctx, || { + self.first_block_blocking().map_err(StorageError::Database) + }) + .await + } + + async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult { + scope::wait_blocking(ctx, || { + self.last_contiguous_block_number_blocking() + .map_err(StorageError::Database) + }) + .await + } + + async fn block( + &self, + ctx: &ctx::Ctx, + number: BlockNumber, + ) -> StorageResult> { + scope::wait_blocking(ctx, || { + self.block_blocking(number).map_err(StorageError::Database) + }) + .await + } + + async fn missing_block_numbers( + &self, + ctx: &ctx::Ctx, + range: ops::Range, + ) -> StorageResult> { + scope::wait_blocking(ctx, || { + self.missing_block_numbers_blocking(range) + .map_err(StorageError::Database) + }) + .await + } + + fn subscribe_to_block_writes(&self) -> watch::Receiver { + self.block_writes_sender.subscribe() + } +} + +#[async_trait] +impl WriteBlockStore for RocksdbStorage { + async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { + scope::wait_blocking(ctx, || { + self.put_block_blocking(block) + .map_err(StorageError::Database) + }) + .await + } +} + +#[async_trait] +impl ReplicaStateStore for RocksdbStorage { + async fn replica_state(&self, ctx: &ctx::Ctx) -> StorageResult> { + scope::wait_blocking(ctx, || { + self.replica_state_blocking() + .map_err(StorageError::Database) + }) + .await + } + + async fn put_replica_state( + &self, + ctx: &ctx::Ctx, + replica_state: &ReplicaState, + ) -> StorageResult<()> { + scope::wait_blocking(ctx, || { + self.put_replica_state_blocking(replica_state) + .map_err(StorageError::Database) + }) + .await + } +} diff --git a/node/libs/storage/src/tests.rs b/node/libs/storage/src/tests.rs index d4b8125f..b9248331 100644 --- a/node/libs/storage/src/tests.rs +++ b/node/libs/storage/src/tests.rs @@ -6,24 +6,24 @@ use roles::validator::{Block, BlockNumber, FinalBlock}; use std::iter; use tempfile::TempDir; -fn init_store(rng: &mut R) -> (FinalBlock, Storage, TempDir) { +async fn init_store(ctx: &ctx::Ctx, rng: &mut R) -> (FinalBlock, RocksdbStorage, TempDir) { let genesis_block = FinalBlock { block: Block::genesis(vec![]), justification: rng.gen(), }; - let temp_dir = TempDir::new().unwrap(); - let block_store = Storage::new(&genesis_block, temp_dir.path()); - + let block_store = RocksdbStorage::new(ctx, &genesis_block, temp_dir.path()) + .await + .unwrap(); (genesis_block, block_store, temp_dir) } -#[test] -fn init_store_twice() { +#[tokio::test] +async fn init_store_twice() { let ctx = ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - let (genesis_block, block_store, temp_dir) = init_store(rng); + let (genesis_block, block_store, temp_dir) = init_store(&ctx, rng).await; let block_1 = FinalBlock { block: Block { parent: genesis_block.block.hash(), @@ -32,26 +32,28 @@ fn init_store_twice() { }, justification: rng.gen(), }; - block_store.put_block(&block_1); + block_store.put_block(&ctx, &block_1).await.unwrap(); - assert_eq!(block_store.get_first_block(), genesis_block); - assert_eq!(block_store.get_head_block(), block_1); + assert_eq!(block_store.first_block(&ctx).await.unwrap(), genesis_block); + assert_eq!(block_store.head_block(&ctx).await.unwrap(), block_1); drop(block_store); - let block_store = Storage::new(&genesis_block, temp_dir.path()); + let block_store = RocksdbStorage::new(&ctx, &genesis_block, temp_dir.path()) + .await + .unwrap(); - assert_eq!(block_store.get_first_block(), genesis_block); - assert_eq!(block_store.get_head_block(), block_1); + assert_eq!(block_store.first_block(&ctx).await.unwrap(), genesis_block); + assert_eq!(block_store.head_block(&ctx).await.unwrap(), block_1); } -#[test] -fn test_put_block() { +#[tokio::test] +async fn test_put_block() { let ctx = ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - let (genesis_block, block_store, _temp_dir) = init_store(rng); + let (genesis_block, block_store, _temp_dir) = init_store(&ctx, rng).await; - assert_eq!(block_store.get_first_block(), genesis_block); - assert_eq!(block_store.get_head_block(), genesis_block); + assert_eq!(block_store.first_block(&ctx).await.unwrap(), genesis_block); + assert_eq!(block_store.head_block(&ctx).await.unwrap(), genesis_block); let mut block_subscriber = block_store.subscribe_to_block_writes(); assert_eq!(*block_subscriber.borrow_and_update(), BlockNumber(0)); @@ -65,10 +67,10 @@ fn test_put_block() { }, justification: rng.gen(), }; - block_store.put_block(&block_1); + block_store.put_block(&ctx, &block_1).await.unwrap(); - assert_eq!(block_store.get_first_block(), genesis_block); - assert_eq!(block_store.get_head_block(), block_1); + assert_eq!(block_store.first_block(&ctx).await.unwrap(), genesis_block); + assert_eq!(block_store.head_block(&ctx).await.unwrap(), block_1); assert_eq!(*block_subscriber.borrow_and_update(), block_1.block.number); // Test inserting a block with a valid parent that is not the genesis. @@ -80,19 +82,14 @@ fn test_put_block() { }, justification: rng.gen(), }; - block_store.put_block(&block_2); + block_store.put_block(&ctx, &block_2).await.unwrap(); - assert_eq!(block_store.get_first_block(), genesis_block); - assert_eq!(block_store.get_head_block(), block_2); + assert_eq!(block_store.first_block(&ctx).await.unwrap(), genesis_block); + assert_eq!(block_store.head_block(&ctx).await.unwrap(), block_2); assert_eq!(*block_subscriber.borrow_and_update(), block_2.block.number); } -#[test] -fn test_get_missing_block_numbers() { - let ctx = ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - let (genesis_block, block_store, _temp_dir) = init_store(rng); - +fn gen_blocks(rng: &mut impl Rng, genesis_block: FinalBlock, count: usize) -> Vec { let blocks = iter::successors(Some(genesis_block), |parent| { let block = Block { parent: parent.block.hash(), @@ -104,20 +101,35 @@ fn test_get_missing_block_numbers() { justification: rng.gen(), }) }); - let mut blocks: Vec<_> = blocks.skip(1).take(100).collect(); + blocks.skip(1).take(count).collect() +} + +#[tokio::test] +async fn test_get_missing_block_numbers() { + let ctx = ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let (genesis_block, block_store, _temp_dir) = init_store(&ctx, rng).await; + let mut blocks = gen_blocks(rng, genesis_block, 100); blocks.shuffle(rng); assert!(block_store - .get_missing_block_numbers(BlockNumber(0)..BlockNumber(101)) + .missing_block_numbers(&ctx, BlockNumber(0)..BlockNumber(101)) + .await + .unwrap() .into_iter() .map(|number| number.0) .eq(1..101)); for (i, block) in blocks.iter().enumerate() { - block_store.put_block(block); - let missing_block_numbers = - block_store.get_missing_block_numbers(BlockNumber(0)..BlockNumber(101)); - let last_contiguous_block_number = block_store.get_last_contiguous_block_number(); + block_store.put_block(&ctx, block).await.unwrap(); + let missing_block_numbers = block_store + .missing_block_numbers(&ctx, BlockNumber(0)..BlockNumber(101)) + .await + .unwrap(); + let last_contiguous_block_number = block_store + .last_contiguous_block_number(&ctx) + .await + .unwrap(); let mut expected_block_numbers: Vec<_> = blocks[(i + 1)..].iter().map(|b| b.block.number).collect(); @@ -143,3 +155,10 @@ fn test_schema_encode_decode() { let replica = rng.gen::(); assert_eq!(replica, schema::decode(&schema::encode(&replica)).unwrap()); } + +#[test] +fn cancellation_is_detected_in_storage_errors() { + let err = StorageError::from(ctx::Canceled); + let err = anyhow::Error::from(err); + assert!(err.root_cause().is::()); +} diff --git a/node/libs/storage/src/traits.rs b/node/libs/storage/src/traits.rs new file mode 100644 index 00000000..e87f62ef --- /dev/null +++ b/node/libs/storage/src/traits.rs @@ -0,0 +1,126 @@ +//! Traits for storage. + +use crate::{types::ReplicaState, StorageResult}; +use async_trait::async_trait; +use concurrency::{ctx, sync::watch}; +use roles::validator::{BlockNumber, FinalBlock}; +use std::{fmt, ops, sync::Arc}; + +/// Storage of L2 blocks. +/// +/// Implementations **must** propagate context cancellation using [`StorageError::Canceled`]. +#[async_trait] +pub trait BlockStore: fmt::Debug + Send + Sync { + /// Gets the head block. + async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult; + + /// Returns a block with the least number stored in this database. + async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult; + + /// Returns the number of the last block in the first contiguous range of blocks stored in this DB. + /// If there are no missing blocks, this is equal to the number of [`Self::get_head_block()`], + /// if there *are* missing blocks, the returned number will be lower. + async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult; + + /// Gets a block by its number. + async fn block(&self, ctx: &ctx::Ctx, number: BlockNumber) + -> StorageResult>; + + /// Iterates over block numbers in the specified `range` that the DB *does not* have. + // TODO(slowli): We might want to limit the length of the vec returned + async fn missing_block_numbers( + &self, + ctx: &ctx::Ctx, + range: ops::Range, + ) -> StorageResult>; + + /// Subscribes to block write operations performed using this `Storage`. Note that since + /// updates are passed using a `watch` channel, only the latest written [`BlockNumber`] + /// will be available; intermediate updates may be dropped. + /// + /// If no blocks were written during the `Storage` lifetime, the channel contains the number + /// of the genesis block. + fn subscribe_to_block_writes(&self) -> watch::Receiver; +} + +#[async_trait] +impl BlockStore for Arc { + async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult { + (**self).head_block(ctx).await + } + + async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult { + (**self).first_block(ctx).await + } + + async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult { + (**self).last_contiguous_block_number(ctx).await + } + + async fn block( + &self, + ctx: &ctx::Ctx, + number: BlockNumber, + ) -> StorageResult> { + (**self).block(ctx, number).await + } + + async fn missing_block_numbers( + &self, + ctx: &ctx::Ctx, + range: ops::Range, + ) -> StorageResult> { + (**self).missing_block_numbers(ctx, range).await + } + + fn subscribe_to_block_writes(&self) -> watch::Receiver { + (**self).subscribe_to_block_writes() + } +} + +/// Mutable storage of L2 blocks. +/// +/// Implementations **must** propagate context cancellation using [`StorageError::Canceled`]. +#[async_trait] +pub trait WriteBlockStore: BlockStore { + /// Puts a block into this storage. + async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()>; +} + +#[async_trait] +impl WriteBlockStore for Arc { + async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { + (**self).put_block(ctx, block).await + } +} + +/// Storage for [`ReplicaState`]. +/// +/// Implementations **must** propagate context cancellation using [`StorageError::Canceled`]. +#[async_trait] +pub trait ReplicaStateStore: BlockStore { + /// Gets the replica state, if it is contained in the database. + async fn replica_state(&self, ctx: &ctx::Ctx) -> StorageResult>; + + /// Store the given replica state into the database. + async fn put_replica_state( + &self, + ctx: &ctx::Ctx, + replica_state: &ReplicaState, + ) -> StorageResult<()>; +} + +#[async_trait] +impl ReplicaStateStore for Arc { + async fn replica_state(&self, ctx: &ctx::Ctx) -> StorageResult> { + (**self).replica_state(ctx).await + } + + async fn put_replica_state( + &self, + ctx: &ctx::Ctx, + replica_state: &ReplicaState, + ) -> StorageResult<()> { + (**self).put_replica_state(ctx, replica_state).await + } +} diff --git a/node/libs/storage/src/types.rs b/node/libs/storage/src/types.rs index 33b19675..de6dc99f 100644 --- a/node/libs/storage/src/types.rs +++ b/node/libs/storage/src/types.rs @@ -1,10 +1,15 @@ //! Defines the schema of the database. use anyhow::Context as _; +use concurrency::ctx; use rocksdb::{Direction, IteratorMode}; -use roles::validator; +use roles::validator::{self, BlockNumber}; use schema::{proto::storage as proto, read_required, ProtoFmt}; -use std::collections::{BTreeMap, HashMap}; +use std::{ + collections::{BTreeMap, HashMap}, + iter, ops, +}; +use thiserror::Error; /// Enum used to represent a key in the database. It also acts as a separator between different stores. #[derive(Clone, Debug, PartialEq, Eq)] @@ -14,7 +19,7 @@ pub(crate) enum DatabaseKey { ReplicaState, /// Key used to store the finalized blocks. /// Block(BlockNumber) -> FinalBlock - Block(validator::BlockNumber), + Block(BlockNumber), } impl DatabaseKey { @@ -43,9 +48,11 @@ impl DatabaseKey { } /// Parses the specified bytes as a `Self::Block(_)` key. - pub(crate) fn parse_block_key(raw_key: &[u8]) -> validator::BlockNumber { - let raw_key = raw_key.try_into().expect("Invalid encoding for block key"); - validator::BlockNumber(u64::from_be_bytes(raw_key)) + pub(crate) fn parse_block_key(raw_key: &[u8]) -> anyhow::Result { + let raw_key = raw_key + .try_into() + .context("Invalid encoding for block key")?; + Ok(BlockNumber(u64::from_be_bytes(raw_key))) } } @@ -62,7 +69,7 @@ pub struct ReplicaState { pub high_qc: validator::CommitQC, /// A cache of the received block proposals. pub block_proposal_cache: - BTreeMap>, + BTreeMap>, } impl ProtoFmt for ReplicaState { @@ -113,3 +120,68 @@ impl ProtoFmt for ReplicaState { } } } + +/// Iterator over missing block numbers. +pub(crate) struct MissingBlockNumbers { + range: ops::Range, + existing_numbers: iter::Peekable, +} + +impl MissingBlockNumbers +where + I: Iterator>, +{ + /// Creates a new iterator based on the provided params. + pub(crate) fn new(range: ops::Range, existing_numbers: I) -> Self { + Self { + range, + existing_numbers: existing_numbers.peekable(), + } + } +} + +impl Iterator for MissingBlockNumbers +where + I: Iterator>, +{ + type Item = anyhow::Result; + + fn next(&mut self) -> Option { + // Loop while existing numbers match the starting numbers from the range. The check + // that the range is non-empty is redundant given how `existing_numbers` are constructed + // (they are guaranteed to be lesser than the upper range bound); we add it just to be safe. + while !self.range.is_empty() + && matches!(self.existing_numbers.peek(), Some(&Ok(num)) if num == self.range.start) + { + self.range.start = self.range.start.next(); + self.existing_numbers.next(); // Advance to the next number + } + + if matches!(self.existing_numbers.peek(), Some(&Err(_))) { + let err = self.existing_numbers.next().unwrap().unwrap_err(); + // ^ Both unwraps are safe due to the check above. + return Some(Err(err)); + } + + if self.range.is_empty() { + return None; + } + let next_number = self.range.start; + self.range.start = self.range.start.next(); + Some(Ok(next_number)) + } +} + +/// Storage errors. +#[derive(Debug, Error)] +pub enum StorageError { + /// Operation was canceled by structured concurrency framework. + #[error("operation was canceled by structured concurrency framework")] + Canceled(#[from] ctx::Canceled), + /// Database operation failed. + #[error("database operation failed")] + Database(#[source] anyhow::Error), +} + +/// [`Result`] for fallible storage operations. +pub type StorageResult = Result;