Skip to content

Commit

Permalink
Merge pull request #2296 from ljedrz/feat/atomic_batch_override
Browse files Browse the repository at this point in the history
[HackerOne-2239704] Introduce an override for atomic batch operations
  • Loading branch information
howardwu authored Feb 21, 2024
2 parents 9d4bdea + dd685ac commit d23daa8
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ async = [ "snarkvm-ledger/async", "snarkvm-synthesizer/async" ]
cuda = [ "snarkvm-algorithms/cuda" ]
parameters_no_std_out = [ "snarkvm-parameters/no_std_out" ]
noconfig = [ ]
rocks = [ "snarkvm-ledger/rocks" ]
rocks = [ "snarkvm-ledger/rocks", "snarkvm-synthesizer/rocks" ]
test = [ "snarkvm-ledger/test" ]
test-helpers = [ "snarkvm-ledger/test-helpers" ]
timer = [ "snarkvm-ledger/timer" ]
Expand Down
39 changes: 38 additions & 1 deletion ledger/store/src/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,20 @@ pub trait BlockStorage<N: Network>: 'static + Clone + Send + Sync {
self.transaction_store().finish_atomic()
}

/// Pauses atomic writes.
fn pause_atomic_writes(&self) -> Result<()> {
// Since this applies to the entire storage, any map can be used; this
// one is just the first one in the list.
self.state_root_map().pause_atomic_writes()
}

/// Unpauses atomic writes.
fn unpause_atomic_writes<const DISCARD_BATCH: bool>(&self) -> Result<()> {
// Since this applies to the entire storage, any map can be used; this
// one is just the first one in the list.
self.state_root_map().unpause_atomic_writes::<DISCARD_BATCH>()
}

/// Stores the given `(state root, block)` pair into storage.
fn insert(&self, state_root: N::StateRoot, block: &Block<N>) -> Result<()> {
// Prepare the confirmed transactions.
Expand Down Expand Up @@ -1049,6 +1063,20 @@ impl<N: Network, B: BlockStorage<N>> BlockStore<N, B> {
Ok(())
}

/// Reverts the Merkle tree to its shape before the insertion of the last 'n' blocks.
pub fn remove_last_n_from_tree_only(&self, n: u32) -> Result<()> {
// Ensure 'n' is non-zero.
ensure!(n > 0, "Cannot remove zero blocks");
// Acquire the write lock on the block tree.
let mut tree = self.tree.write();
// Prepare an updated Merkle tree removing the last 'n' block hashes.
let updated_tree = tree.prepare_remove_last_n(usize::try_from(n)?)?;
// Update the block tree.
*tree = updated_tree;
// Return success.
Ok(())
}

/// Removes the last 'n' blocks from storage.
pub fn remove_last_n(&self, n: u32) -> Result<()> {
// Ensure 'n' is non-zero.
Expand All @@ -1073,7 +1101,6 @@ impl<N: Network, B: BlockStorage<N>> BlockStore<N, B> {
}
None => bail!("Failed to remove last '{n}' blocks: no blocks in storage"),
};

// Fetch the block hashes to remove.
let hashes = cfg_into_iter!(heights)
.map(|height| match self.storage.get_block_hash(height)? {
Expand Down Expand Up @@ -1148,6 +1175,16 @@ impl<N: Network, B: BlockStorage<N>> BlockStore<N, B> {
pub fn storage_mode(&self) -> &StorageMode {
self.storage.storage_mode()
}

/// Pauses atomic writes.
pub fn pause_atomic_writes(&self) -> Result<()> {
self.storage.pause_atomic_writes()
}

/// Unpauses atomic writes.
pub fn unpause_atomic_writes<const DISCARD_BATCH: bool>(&self) -> Result<()> {
self.storage.unpause_atomic_writes::<DISCARD_BATCH>()
}
}

impl<N: Network, B: BlockStorage<N>> BlockStore<N, B> {
Expand Down
19 changes: 19 additions & 0 deletions ledger/store/src/helpers/memory/internal/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,25 @@ impl<

Ok(())
}

///
/// Once called, the subsequent atomic write batches will be queued instead of being executed
/// at the end of their scope. `unpause_atomic_writes` needs to be called in order to
/// restore the usual behavior.
///
fn pause_atomic_writes(&self) -> Result<()> {
// No effect.
Ok(())
}

///
/// Executes all of the queued writes as a single atomic operation and restores the usual
/// behavior of atomic write batches that was altered by calling `pause_atomic_writes`.
///
fn unpause_atomic_writes<const DISCARD_BATCH: bool>(&self) -> Result<()> {
// No effect.
Ok(())
}
}

impl<
Expand Down
29 changes: 25 additions & 4 deletions ledger/store/src/helpers/rocksdb/internal/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,11 @@ impl<

// Ensure that the atomic batch is empty.
assert!(self.atomic_batch.lock().is_empty());
// Ensure that the database atomic batch is empty.
assert!(self.database.atomic_batch.lock().is_empty());
// Ensure that the database atomic batch is empty; skip this check if the atomic
// writes are paused, as there may be pending operations.
if !self.database.are_atomic_writes_paused() {
assert!(self.database.atomic_batch.lock().is_empty());
}
}

///
Expand Down Expand Up @@ -217,8 +220,9 @@ impl<
assert!(previous_atomic_depth != 0);

// If we're at depth 0, it is the final call to `finish_atomic` and the
// atomic write batch can be physically executed.
if previous_atomic_depth == 1 {
// atomic write batch can be physically executed. This is skipped if the
// atomic writes are paused.
if previous_atomic_depth == 1 && !self.database.are_atomic_writes_paused() {
// Empty the collection of pending operations.
let batch = mem::take(&mut *self.database.atomic_batch.lock());
// Execute all the operations atomically.
Expand All @@ -229,6 +233,23 @@ impl<

Ok(())
}

///
/// Once called, the subsequent atomic write batches will be queued instead of being executed
/// at the end of their scope. `unpause_atomic_writes` needs to be called in order to
/// restore the usual behavior.
///
fn pause_atomic_writes(&self) -> Result<()> {
self.database.pause_atomic_writes()
}

///
/// Executes all of the queued writes as a single atomic operation and restores the usual
/// behavior of atomic write batches that was altered by calling `pause_atomic_writes`.
///
fn unpause_atomic_writes<const DISCARD_BATCH: bool>(&self) -> Result<()> {
self.database.unpause_atomic_writes::<DISCARD_BATCH>()
}
}

impl<
Expand Down
59 changes: 57 additions & 2 deletions ledger/store/src/helpers/rocksdb/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ pub use nested_map::*;
mod tests;

use aleo_std_storage::StorageMode;
use anyhow::{bail, Result};
use anyhow::{bail, ensure, Result};
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use serde::{de::DeserializeOwned, Serialize};
use std::{
borrow::Borrow,
marker::PhantomData,
mem,
ops::Deref,
sync::{
atomic::{AtomicBool, AtomicUsize},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
};
Expand Down Expand Up @@ -88,6 +89,8 @@ pub struct RocksDB {
/// The depth of the current atomic write batch; it gets incremented with every call
/// to `start_atomic` and decremented with each call to `finish_atomic`.
pub(super) atomic_depth: Arc<AtomicUsize>,
/// A flag indicating whether the atomic writes are currently paused.
pub(super) atomic_writes_paused: Arc<AtomicBool>,
}

impl Deref for RocksDB {
Expand Down Expand Up @@ -132,6 +135,7 @@ impl Database for RocksDB {
storage_mode: storage.clone().into(),
atomic_batch: Default::default(),
atomic_depth: Default::default(),
atomic_writes_paused: Default::default(),
})
})?
.clone();
Expand Down Expand Up @@ -202,6 +206,56 @@ impl Database for RocksDB {
}

impl RocksDB {
/// Pause the execution of atomic writes for the entire database.
fn pause_atomic_writes(&self) -> Result<()> {
// This operation is only intended to be performed before or after
// atomic batches - never in the middle of them.
assert_eq!(self.atomic_depth.load(Ordering::SeqCst), 0);

// Set the flag indicating that the pause is in effect.
let already_paused = self.atomic_writes_paused.swap(true, Ordering::SeqCst);
// Make sure that we haven't already paused atomic writes (which would
// indicate a logic bug).
assert!(!already_paused);

Ok(())
}

/// Unpause the execution of atomic writes for the entire database; this
/// executes all the writes that have been queued since they were paused.
fn unpause_atomic_writes<const DISCARD_BATCH: bool>(&self) -> Result<()> {
// Ensure the call to unpause is only performed before or after an atomic batch scope
// - and never in the middle of one (otherwise there is a fundamental logic bug).
// Note: In production, this `ensure` is a safety-critical invariant that never fails.
ensure!(self.atomic_depth.load(Ordering::SeqCst) == 0, "Atomic depth must be 0 to unpause atomic writes");

// https://github.com/rust-lang/rust/issues/98485
let currently_paused = self.atomic_writes_paused.load(Ordering::SeqCst);
// Ensure the database is paused (otherwise there is a fundamental logic bug).
// Note: In production, this `ensure` is a safety-critical invariant that never fails.
ensure!(currently_paused, "Atomic writes must be paused to unpause them");

// In order to ensure that all the operations that are intended
// to be atomic via the usual macro approach are still performed
// atomically (just as a part of a larger batch), every atomic
// storage operation that has accumulated from the moment the
// writes have been paused becomes executed as a single atomic batch.
let batch = mem::take(&mut *self.atomic_batch.lock());
if !DISCARD_BATCH {
self.rocksdb.write(batch)?;
}

// Unset the flag indicating that the pause is in effect.
self.atomic_writes_paused.store(false, Ordering::SeqCst);

Ok(())
}

/// Checks whether the atomic writes are currently paused.
fn are_atomic_writes_paused(&self) -> bool {
self.atomic_writes_paused.load(Ordering::SeqCst)
}

/// Opens the test database.
#[cfg(any(test, feature = "test"))]
pub fn open_testing(temp_dir: std::path::PathBuf, dev: Option<u16>) -> Result<Self> {
Expand Down Expand Up @@ -254,6 +308,7 @@ impl RocksDB {
storage_mode: storage_mode.clone(),
atomic_batch: Default::default(),
atomic_depth: Default::default(),
atomic_writes_paused: Default::default(),
})
}?;

Expand Down
12 changes: 8 additions & 4 deletions ledger/store/src/helpers/rocksdb/internal/nested_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,11 @@ impl<

// Ensure that the atomic batch is empty.
assert!(self.atomic_batch.lock().is_empty());
// Ensure that the database atomic batch is empty.
assert!(self.database.atomic_batch.lock().is_empty());
// Ensure that the database atomic batch is empty; skip this check if the atomic
// writes are paused, as there may be pending operations.
if !self.database.are_atomic_writes_paused() {
assert!(self.database.atomic_batch.lock().is_empty());
}
}

///
Expand Down Expand Up @@ -323,8 +326,9 @@ impl<
assert!(previous_atomic_depth != 0);

// If we're at depth 0, it is the final call to `finish_atomic` and the
// atomic write batch can be physically executed.
if previous_atomic_depth == 1 {
// atomic write batch can be physically executed. This is skipped if the
// atomic writes are paused.
if previous_atomic_depth == 1 && !self.database.are_atomic_writes_paused() {
// Empty the collection of pending operations.
let batch = mem::take(&mut *self.database.atomic_batch.lock());
// Execute all the operations atomically.
Expand Down
13 changes: 13 additions & 0 deletions ledger/store/src/helpers/traits/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ pub trait Map<
/// Finishes an atomic operation, performing all the queued writes.
///
fn finish_atomic(&self) -> Result<()>;

///
/// Once called, the subsequent atomic write batches will be queued instead of being executed
/// at the end of their scope. `unpause_atomic_writes` needs to be called in order to
/// restore the usual behavior.
///
fn pause_atomic_writes(&self) -> Result<()>;

///
/// Executes all of the queued writes as a single atomic operation and restores the usual
/// behavior of atomic write batches that was altered by calling `pause_atomic_writes`.
///
fn unpause_atomic_writes<const DISCARD_BATCH: bool>(&self) -> Result<()>;
}

/// A trait representing map-like storage operations with read-only capabilities.
Expand Down
1 change: 1 addition & 0 deletions synthesizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ snark = [ "synthesizer-snark" ]
aleo-cli = [ ]
async = [ "ledger-query/async", "synthesizer-process/async" ]
cuda = [ "algorithms/cuda" ]
rocks = [ "ledger-store/rocks" ]
serial = [
"console/serial",
"ledger-block/serial",
Expand Down
43 changes: 33 additions & 10 deletions synthesizer/src/vm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,22 +344,45 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
block.previous_hash(),
)?;

// Attention: The following order is crucial because if 'finalize' fails, we can rollback the block.
// If one first calls 'finalize', then calls 'insert(block)' and it fails, there is no way to rollback 'finalize'.
// Pause the atomic writes, so that both the insertion and finalization belong to a single batch.
#[cfg(feature = "rocks")]
self.block_store().pause_atomic_writes()?;

// First, insert the block.
self.block_store().insert(block)?;
// Next, finalize the transactions.
match self.finalize(state, block.ratifications(), block.solutions(), block.transactions()) {
Ok(_ratified_finalize_operations) => Ok(()),
Ok(_ratified_finalize_operations) => {
// Unpause the atomic writes, executing the ones queued from block insertion and finalization.
#[cfg(feature = "rocks")]
self.block_store().unpause_atomic_writes::<false>()?;
Ok(())
}
Err(finalize_error) => {
// Rollback the block.
self.block_store().remove_last_n(1).map_err(|removal_error| {
// Log the finalize error.
error!("Failed to finalize block {} - {finalize_error}", block.height());
// Return the removal error.
removal_error
})?;
if cfg!(feature = "rocks") {
// Clear all pending atomic operations so that unpausing the atomic writes
// doesn't execute any of the queued storage operations.
self.block_store().abort_atomic();
self.finalize_store().abort_atomic();
// Disable the atomic batch override.
// Note: This call is guaranteed to succeed (without error), because `DISCARD_BATCH == true`.
self.block_store().unpause_atomic_writes::<true>()?;
// Rollback the Merkle tree.
self.block_store().remove_last_n_from_tree_only(1).map_err(|removal_error| {
// Log the finalize error.
error!("Failed to finalize block {} - {finalize_error}", block.height());
// Return the removal error.
removal_error
})?;
} else {
// Rollback the block.
self.block_store().remove_last_n(1).map_err(|removal_error| {
// Log the finalize error.
error!("Failed to finalize block {} - {finalize_error}", block.height());
// Return the removal error.
removal_error
})?;
}
// Return the finalize error.
Err(finalize_error)
}
Expand Down

0 comments on commit d23daa8

Please sign in to comment.