From 19a0bd0e1630be6263c2f227de5a11fcb79e2e3f Mon Sep 17 00:00:00 2001 From: ljedrz Date: Wed, 27 Dec 2023 15:47:27 +0100 Subject: [PATCH 01/10] feat: introduce an override for atomic batch operations Signed-off-by: ljedrz --- ledger/store/src/block/mod.rs | 10 +++++++ .../store/src/helpers/memory/internal/map.rs | 12 +++++++++ .../store/src/helpers/rocksdb/internal/map.rs | 25 ++++++++++++++--- .../store/src/helpers/rocksdb/internal/mod.rs | 27 ++++++++++++++++++- .../helpers/rocksdb/internal/nested_map.rs | 13 ++++++--- ledger/store/src/helpers/traits/map.rs | 10 +++++++ synthesizer/src/vm/mod.rs | 17 +++++++++++- 7 files changed, 104 insertions(+), 10 deletions(-) diff --git a/ledger/store/src/block/mod.rs b/ledger/store/src/block/mod.rs index 4ffb292fec..465ea52813 100644 --- a/ledger/store/src/block/mod.rs +++ b/ledger/store/src/block/mod.rs @@ -392,6 +392,11 @@ pub trait BlockStorage: 'static + Clone + Send + Sync { self.transaction_store().finish_atomic() } + /// Either enables or disables the atomic override. + fn flip_atomic_override(&self) -> Result { + self.state_root_map().flip_atomic_override() + } + /// Stores the given `(state root, block)` pair into storage. fn insert(&self, state_root: N::StateRoot, block: &Block) -> Result<()> { // Prepare the confirmed transactions. @@ -1147,6 +1152,11 @@ impl> BlockStore { pub fn dev(&self) -> Option { self.storage.dev() } + + /// Either enables or disables the atomic override. + pub fn flip_atomic_override(&self) -> Result { + self.storage.flip_atomic_override() + } } impl> BlockStore { diff --git a/ledger/store/src/helpers/memory/internal/map.rs b/ledger/store/src/helpers/memory/internal/map.rs index 6c0e838db0..e8c88aac87 100644 --- a/ledger/store/src/helpers/memory/internal/map.rs +++ b/ledger/store/src/helpers/memory/internal/map.rs @@ -228,6 +228,18 @@ impl< Ok(()) } + + /// + /// The atomic override can be used to merge disjoint atomic write batches. + /// When enabled, the subsequent atomic write batches no longer automatically + /// perform a write at the end of their scope; instead, they only extend the + /// pending write batch until `flip_atomic_override` is called again. + /// The returned boolean indicates the current state of the override (`true` + /// means it was enabled, `false` that it was disabled). + /// + fn flip_atomic_override(&self) -> Result { + Ok(false) + } } impl< diff --git a/ledger/store/src/helpers/rocksdb/internal/map.rs b/ledger/store/src/helpers/rocksdb/internal/map.rs index 2dc677ad07..bb62bcc0e7 100644 --- a/ledger/store/src/helpers/rocksdb/internal/map.rs +++ b/ledger/store/src/helpers/rocksdb/internal/map.rs @@ -107,8 +107,11 @@ impl< // Ensure that the atomic batch is empty. assert!(self.atomic_batch.lock().is_empty()); - // Ensure that the database atomic batch is empty. - assert!(self.database.atomic_batch.lock().is_empty()); + // Ensure that the database atomic batch is empty; skip this check if the atomic + // override is enabled, as there may be pending storage operations. + if !self.database.atomic_override.load(Ordering::SeqCst) { + assert!(self.database.atomic_batch.lock().is_empty()); + } } /// @@ -217,8 +220,10 @@ impl< assert!(previous_atomic_depth != 0); // If we're at depth 0, it is the final call to `finish_atomic` and the - // atomic write batch can be physically executed. - if previous_atomic_depth == 1 { + // atomic write batch can be physically executed. This is skipped if the + // atomic override is in force, as the pending operations are executed + // when it is disabled. + if previous_atomic_depth == 1 && !self.database.atomic_override.load(Ordering::SeqCst) { // Empty the collection of pending operations. let batch = mem::take(&mut *self.database.atomic_batch.lock()); // Execute all the operations atomically. @@ -229,6 +234,18 @@ impl< Ok(()) } + + /// + /// The atomic override can be used to merge disjoint atomic write batches. + /// When enabled, the subsequent atomic write batches no longer automatically + /// perform a write at the end of their scope; instead, they only extend the + /// pending write batch until `flip_atomic_override` is called again. + /// The returned boolean indicates the current state of the override (`true` + /// means it was enabled, `false` that it was disabled). + /// + fn flip_atomic_override(&self) -> Result { + self.database.flip_atomic_override() + } } impl< diff --git a/ledger/store/src/helpers/rocksdb/internal/mod.rs b/ledger/store/src/helpers/rocksdb/internal/mod.rs index 80dcfdec06..51ad713659 100644 --- a/ledger/store/src/helpers/rocksdb/internal/mod.rs +++ b/ledger/store/src/helpers/rocksdb/internal/mod.rs @@ -31,9 +31,10 @@ use serde::{de::DeserializeOwned, Serialize}; use std::{ borrow::Borrow, marker::PhantomData, + mem, ops::Deref, sync::{ - atomic::{AtomicBool, AtomicUsize}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, }; @@ -81,6 +82,8 @@ pub struct RocksDB { /// The depth of the current atomic write batch; it gets incremented with every call /// to `start_atomic` and decremented with each call to `finish_atomic`. pub(super) atomic_depth: Arc, + /// A flag indicating whether the atomic override is currently in effect. + pub(super) atomic_override: Arc, } impl Deref for RocksDB { @@ -125,6 +128,7 @@ impl Database for RocksDB { dev, atomic_batch: Default::default(), atomic_depth: Default::default(), + atomic_override: Default::default(), }) })? .clone(); @@ -189,6 +193,26 @@ impl Database for RocksDB { } impl RocksDB { + /// Toggles the atomic override; if it becomes disabled, the pending + /// operations get executed. + fn flip_atomic_override(&self) -> Result { + // https://github.com/rust-lang/rust/issues/98485 + let previous_value = self.atomic_override.load(Ordering::SeqCst); + + // A flip from enabled to disabled executes all pending operations + // as a single atomic batch. + if previous_value { + let batch = mem::take(&mut *self.atomic_batch.lock()); + self.rocksdb.write(batch)?; + } + + // Flip the flag. + self.atomic_override.store(!previous_value, Ordering::SeqCst); + + // Return the current value of the flag. + Ok(!previous_value) + } + /// Opens the test database. #[cfg(any(test, feature = "test"))] pub fn open_testing(temp_dir: std::path::PathBuf, dev: Option) -> Result { @@ -238,6 +262,7 @@ impl RocksDB { dev, atomic_batch: Default::default(), atomic_depth: Default::default(), + atomic_override: Default::default(), }) }?; diff --git a/ledger/store/src/helpers/rocksdb/internal/nested_map.rs b/ledger/store/src/helpers/rocksdb/internal/nested_map.rs index 0634e95a7b..37fcdd7334 100644 --- a/ledger/store/src/helpers/rocksdb/internal/nested_map.rs +++ b/ledger/store/src/helpers/rocksdb/internal/nested_map.rs @@ -198,8 +198,11 @@ impl< // Ensure that the atomic batch is empty. assert!(self.atomic_batch.lock().is_empty()); - // Ensure that the database atomic batch is empty. - assert!(self.database.atomic_batch.lock().is_empty()); + // Ensure that the database atomic batch is empty; skip this check if the atomic + // override is enabled, as there may be pending storage operations. + if !self.database.atomic_override.load(Ordering::SeqCst) { + assert!(self.database.atomic_batch.lock().is_empty()); + } } /// @@ -320,8 +323,10 @@ impl< assert!(previous_atomic_depth != 0); // If we're at depth 0, it is the final call to `finish_atomic` and the - // atomic write batch can be physically executed. - if previous_atomic_depth == 1 { + // atomic write batch can be physically executed. This is skipped if the + // atomic override is in force, as the pending operations are executed + // when it is disabled. + if previous_atomic_depth == 1 && !self.database.atomic_override.load(Ordering::SeqCst) { // Empty the collection of pending operations. let batch = mem::take(&mut *self.database.atomic_batch.lock()); // Execute all the operations atomically. diff --git a/ledger/store/src/helpers/traits/map.rs b/ledger/store/src/helpers/traits/map.rs index 3f3c15d298..a1bbd1876c 100644 --- a/ledger/store/src/helpers/traits/map.rs +++ b/ledger/store/src/helpers/traits/map.rs @@ -73,6 +73,16 @@ pub trait Map< /// Finishes an atomic operation, performing all the queued writes. /// fn finish_atomic(&self) -> Result<()>; + + /// + /// The atomic override can be used to merge disjoint atomic write batches. + /// When enabled, the subsequent atomic write batches no longer automatically + /// perform a write at the end of their scope; instead, they only extend the + /// pending write batch until `flip_atomic_override` is called again. + /// The returned boolean indicates the current state of the override (`true` + /// means it was enabled, `false` that it was disabled). + /// + fn flip_atomic_override(&self) -> Result; } /// A trait representing map-like storage operations with read-only capabilities. diff --git a/synthesizer/src/vm/mod.rs b/synthesizer/src/vm/mod.rs index a9e29d1594..1a97d9cd0a 100644 --- a/synthesizer/src/vm/mod.rs +++ b/synthesizer/src/vm/mod.rs @@ -335,12 +335,27 @@ impl> VM { // Attention: The following order is crucial because if 'finalize' fails, we can rollback the block. // If one first calls 'finalize', then calls 'insert(block)' and it fails, there is no way to rollback 'finalize'. + // Enable the atomic batch override, so that both the insertion and finalization belong to a single batch. + #[cfg(feature = "rocks")] + assert!(self.block_store().flip_atomic_override()?); + // First, insert the block. self.block_store().insert(block)?; // Next, finalize the transactions. match self.finalize(state, block.ratifications(), block.solutions(), block.transactions()) { - Ok(_ratified_finalize_operations) => Ok(()), + Ok(_ratified_finalize_operations) => { + // Disable the atomic batch override, executing it. + #[cfg(feature = "rocks")] + assert!(!self.block_store().flip_atomic_override()?); + Ok(()) + } Err(finalize_error) => { + // Rewind the pending operations related to block insertion. + #[cfg(feature = "rocks")] + self.block_store().atomic_rewind(); + // Disable the atomic batch override. + #[cfg(feature = "rocks")] + assert!(!self.block_store().flip_atomic_override()?); // Rollback the block. self.block_store().remove_last_n(1).map_err(|removal_error| { // Log the finalize error. From adf3aada8a1359d1d3adcd1a4daa66d027785d9a Mon Sep 17 00:00:00 2001 From: ljedrz Date: Fri, 19 Jan 2024 10:17:03 +0100 Subject: [PATCH 02/10] tweak: disallow atomic override flips in the middle of batches Signed-off-by: ljedrz --- ledger/store/src/helpers/rocksdb/internal/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ledger/store/src/helpers/rocksdb/internal/mod.rs b/ledger/store/src/helpers/rocksdb/internal/mod.rs index 51ad713659..ff215bf5d2 100644 --- a/ledger/store/src/helpers/rocksdb/internal/mod.rs +++ b/ledger/store/src/helpers/rocksdb/internal/mod.rs @@ -196,6 +196,10 @@ impl RocksDB { /// Toggles the atomic override; if it becomes disabled, the pending /// operations get executed. fn flip_atomic_override(&self) -> Result { + // The override is only intended to be toggled before or after + // atomic batches - never in the middle of them. + assert_eq!(self.atomic_depth.load(Ordering::SeqCst), 0); + // https://github.com/rust-lang/rust/issues/98485 let previous_value = self.atomic_override.load(Ordering::SeqCst); From ff761aeb0402e6d4c01f15cbbf9f16f4c1af3acc Mon Sep 17 00:00:00 2001 From: ljedrz Date: Sat, 20 Jan 2024 11:11:13 +0100 Subject: [PATCH 03/10] docs: improve atomic batch override notes Signed-off-by: ljedrz --- ledger/store/src/block/mod.rs | 4 ++++ ledger/store/src/helpers/rocksdb/internal/mod.rs | 8 +++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/ledger/store/src/block/mod.rs b/ledger/store/src/block/mod.rs index 465ea52813..e97bd93073 100644 --- a/ledger/store/src/block/mod.rs +++ b/ledger/store/src/block/mod.rs @@ -394,6 +394,8 @@ pub trait BlockStorage: 'static + Clone + Send + Sync { /// Either enables or disables the atomic override. fn flip_atomic_override(&self) -> Result { + // Since the atomic override applies to the entire storage, any map + // can be used to toggle it. self.state_root_map().flip_atomic_override() } @@ -1155,6 +1157,8 @@ impl> BlockStore { /// Either enables or disables the atomic override. pub fn flip_atomic_override(&self) -> Result { + // Since the atomic override applies to the entire storage, any store + // can be used to toggle it. self.storage.flip_atomic_override() } } diff --git a/ledger/store/src/helpers/rocksdb/internal/mod.rs b/ledger/store/src/helpers/rocksdb/internal/mod.rs index ff215bf5d2..24b8572b64 100644 --- a/ledger/store/src/helpers/rocksdb/internal/mod.rs +++ b/ledger/store/src/helpers/rocksdb/internal/mod.rs @@ -204,9 +204,15 @@ impl RocksDB { let previous_value = self.atomic_override.load(Ordering::SeqCst); // A flip from enabled to disabled executes all pending operations - // as a single atomic batch. + // and makes the storage function in the usual fashion again. if previous_value { let batch = mem::take(&mut *self.atomic_batch.lock()); + // In order to ensure that all the operations that are intended + // to be atomic via the usual macro approach are still performed + // atomically (just as a part of a larger batch), every atomic + // storage operation that has accumulated from the moment the + // override was enabled becomes executed as a single atomic batch + // when the override is disabled (i.e. `previous_value == true`). self.rocksdb.write(batch)?; } From 6bfa9a32fbee623f4c0f8dd450bde5742bced60b Mon Sep 17 00:00:00 2001 From: ljedrz Date: Sat, 20 Jan 2024 11:18:59 +0100 Subject: [PATCH 04/10] fix: adjust the storage logic in case of finalization error Signed-off-by: ljedrz --- synthesizer/src/vm/mod.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/synthesizer/src/vm/mod.rs b/synthesizer/src/vm/mod.rs index 1a97d9cd0a..2f5b0e4d61 100644 --- a/synthesizer/src/vm/mod.rs +++ b/synthesizer/src/vm/mod.rs @@ -350,13 +350,19 @@ impl> VM { Ok(()) } Err(finalize_error) => { - // Rewind the pending operations related to block insertion. + // Clear all the pending atomic operations so that disabling the atomic batch override + // doesn't execute any storage operation that has accumulated since it was enabled. + // This only applies to persistent storage - the in-memory one needs to roll back the + // latest block. #[cfg(feature = "rocks")] - self.block_store().atomic_rewind(); + self.block_store().atomic_abort(); + #[cfg(feature = "rocks")] + self.finalize_store().atomic_abort(); // Disable the atomic batch override. #[cfg(feature = "rocks")] assert!(!self.block_store().flip_atomic_override()?); - // Rollback the block. + // Rollback the block (only applies to in-memory storage). + #[cfg(not(feature = "rocks"))] self.block_store().remove_last_n(1).map_err(|removal_error| { // Log the finalize error. error!("Failed to finalize block {} - {finalize_error}", block.height()); From 26318822dc88eb3387c524749d87cb95037badbf Mon Sep 17 00:00:00 2001 From: ljedrz Date: Mon, 12 Feb 2024 11:38:42 +0100 Subject: [PATCH 05/10] fix: call remove_last_n on finalization failure with persistent storage Signed-off-by: ljedrz --- synthesizer/src/vm/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synthesizer/src/vm/mod.rs b/synthesizer/src/vm/mod.rs index 2f5b0e4d61..ab3aa824c7 100644 --- a/synthesizer/src/vm/mod.rs +++ b/synthesizer/src/vm/mod.rs @@ -361,8 +361,7 @@ impl> VM { // Disable the atomic batch override. #[cfg(feature = "rocks")] assert!(!self.block_store().flip_atomic_override()?); - // Rollback the block (only applies to in-memory storage). - #[cfg(not(feature = "rocks"))] + // Rollback the block. self.block_store().remove_last_n(1).map_err(|removal_error| { // Log the finalize error. error!("Failed to finalize block {} - {finalize_error}", block.height()); From e2e207d37a84f5a203faacf5f9d7591824792fcb Mon Sep 17 00:00:00 2001 From: Howard Wu <9260812+howardwu@users.noreply.github.com> Date: Sun, 18 Feb 2024 14:14:05 -0800 Subject: [PATCH 06/10] nit: rm deprecated comment; add readability scope --- synthesizer/src/vm/mod.rs | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/synthesizer/src/vm/mod.rs b/synthesizer/src/vm/mod.rs index ab3aa824c7..a0c148561c 100644 --- a/synthesizer/src/vm/mod.rs +++ b/synthesizer/src/vm/mod.rs @@ -332,9 +332,6 @@ impl> VM { block.previous_hash(), )?; - // Attention: The following order is crucial because if 'finalize' fails, we can rollback the block. - // If one first calls 'finalize', then calls 'insert(block)' and it fails, there is no way to rollback 'finalize'. - // Enable the atomic batch override, so that both the insertion and finalization belong to a single batch. #[cfg(feature = "rocks")] assert!(self.block_store().flip_atomic_override()?); @@ -350,17 +347,15 @@ impl> VM { Ok(()) } Err(finalize_error) => { - // Clear all the pending atomic operations so that disabling the atomic batch override - // doesn't execute any storage operation that has accumulated since it was enabled. - // This only applies to persistent storage - the in-memory one needs to roll back the - // latest block. - #[cfg(feature = "rocks")] - self.block_store().atomic_abort(); - #[cfg(feature = "rocks")] - self.finalize_store().atomic_abort(); - // Disable the atomic batch override. #[cfg(feature = "rocks")] - assert!(!self.block_store().flip_atomic_override()?); + { + // Clear all pending atomic operations so that disabling the atomic batch override + // doesn't execute any storage operation that accumulated after it was enabled. + self.block_store().atomic_abort(); + self.finalize_store().atomic_abort(); + // Disable the atomic batch override. + assert!(!self.block_store().flip_atomic_override()?); + } // Rollback the block. self.block_store().remove_last_n(1).map_err(|removal_error| { // Log the finalize error. From 14a113609d221751f2081afc8465eeca944d6d31 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Mon, 19 Feb 2024 11:00:38 +0100 Subject: [PATCH 07/10] refactor: isolate a method for the atomic override check Signed-off-by: ljedrz --- ledger/store/src/helpers/rocksdb/internal/map.rs | 4 ++-- ledger/store/src/helpers/rocksdb/internal/mod.rs | 5 +++++ ledger/store/src/helpers/rocksdb/internal/nested_map.rs | 4 ++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/ledger/store/src/helpers/rocksdb/internal/map.rs b/ledger/store/src/helpers/rocksdb/internal/map.rs index 5c6f029508..61dc6bc9d3 100644 --- a/ledger/store/src/helpers/rocksdb/internal/map.rs +++ b/ledger/store/src/helpers/rocksdb/internal/map.rs @@ -109,7 +109,7 @@ impl< assert!(self.atomic_batch.lock().is_empty()); // Ensure that the database atomic batch is empty; skip this check if the atomic // override is enabled, as there may be pending storage operations. - if !self.database.atomic_override.load(Ordering::SeqCst) { + if !self.database.is_atomic_override_active() { assert!(self.database.atomic_batch.lock().is_empty()); } } @@ -223,7 +223,7 @@ impl< // atomic write batch can be physically executed. This is skipped if the // atomic override is in force, as the pending operations are executed // when it is disabled. - if previous_atomic_depth == 1 && !self.database.atomic_override.load(Ordering::SeqCst) { + if previous_atomic_depth == 1 && !self.database.is_atomic_override_active() { // Empty the collection of pending operations. let batch = mem::take(&mut *self.database.atomic_batch.lock()); // Execute all the operations atomically. diff --git a/ledger/store/src/helpers/rocksdb/internal/mod.rs b/ledger/store/src/helpers/rocksdb/internal/mod.rs index 79da1fd92d..56d1a50a2b 100644 --- a/ledger/store/src/helpers/rocksdb/internal/mod.rs +++ b/ledger/store/src/helpers/rocksdb/internal/mod.rs @@ -236,6 +236,11 @@ impl RocksDB { Ok(!previous_value) } + /// Checks whether the atomic override is currently in force. + fn is_atomic_override_active(&self) -> bool { + self.atomic_override.load(Ordering::SeqCst) + } + /// Opens the test database. #[cfg(any(test, feature = "test"))] pub fn open_testing(temp_dir: std::path::PathBuf, dev: Option) -> Result { diff --git a/ledger/store/src/helpers/rocksdb/internal/nested_map.rs b/ledger/store/src/helpers/rocksdb/internal/nested_map.rs index e6d27781d4..770c568b01 100644 --- a/ledger/store/src/helpers/rocksdb/internal/nested_map.rs +++ b/ledger/store/src/helpers/rocksdb/internal/nested_map.rs @@ -203,7 +203,7 @@ impl< assert!(self.atomic_batch.lock().is_empty()); // Ensure that the database atomic batch is empty; skip this check if the atomic // override is enabled, as there may be pending storage operations. - if !self.database.atomic_override.load(Ordering::SeqCst) { + if !self.database.is_atomic_override_active() { assert!(self.database.atomic_batch.lock().is_empty()); } } @@ -329,7 +329,7 @@ impl< // atomic write batch can be physically executed. This is skipped if the // atomic override is in force, as the pending operations are executed // when it is disabled. - if previous_atomic_depth == 1 && !self.database.atomic_override.load(Ordering::SeqCst) { + if previous_atomic_depth == 1 && !self.database.is_atomic_override_active() { // Empty the collection of pending operations. let batch = mem::take(&mut *self.database.atomic_batch.lock()); // Execute all the operations atomically. From 1a27305b7e0acb2c23874bdf1d11ce2b527746e1 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Mon, 19 Feb 2024 11:56:59 +0100 Subject: [PATCH 08/10] fix: don't remove the last block from rocksdb on finalization failure Signed-off-by: ljedrz --- ledger/store/src/block/mod.rs | 38 ++++++++++++++++++++++------------- synthesizer/src/vm/mod.rs | 10 ++++++++- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/ledger/store/src/block/mod.rs b/ledger/store/src/block/mod.rs index 30f49e469c..2dea8cae73 100644 --- a/ledger/store/src/block/mod.rs +++ b/ledger/store/src/block/mod.rs @@ -1056,14 +1056,20 @@ impl> BlockStore { Ok(()) } - /// Removes the last 'n' blocks from storage. - pub fn remove_last_n(&self, n: u32) -> Result<()> { - // Ensure 'n' is non-zero. - ensure!(n > 0, "Cannot remove zero blocks"); - + /// Reverts the Merkle tree to its shape before the insertion of the last 'n' blocks. + pub fn remove_last_n_from_tree(&self, n: u32) -> Result<()> { // Acquire the write lock on the block tree. let mut tree = self.tree.write(); + // Prepare an updated Merkle tree removing the last 'n' block hashes. + let updated_tree = tree.prepare_remove_last_n(usize::try_from(n)?)?; + // Update the block tree. + *tree = updated_tree; + // Return success. + Ok(()) + } + /// Reverts the block store to its shape before the insertion of the last 'n' blocks. + pub fn remove_last_n_from_store(&self, n: u32) -> Result<()> { // Determine the block heights to remove. let heights = match self.storage.id_map().keys_confirmed().max() { Some(height) => { @@ -1074,7 +1080,7 @@ impl> BlockStore { .checked_sub(n - 1) .ok_or_else(|| anyhow!("Failed to remove last '{n}' blocks: block height underflow"))?; // Ensure the block height matches the number of leaves in the Merkle tree. - ensure!(end_height == u32::try_from(tree.number_of_leaves())? - 1, "Block height mismatch"); + ensure!(end_height == u32::try_from(self.tree.read().number_of_leaves())? - 1, "Block height mismatch"); // Output the block heights. start_height..=end_height } @@ -1089,21 +1095,25 @@ impl> BlockStore { }) .collect::>>()?; - // Prepare an updated Merkle tree removing the last 'n' block hashes. - let updated_tree = tree.prepare_remove_last_n(usize::try_from(n)?)?; - atomic_batch_scope!(self, { // Remove the blocks, in descending order. for block_hash in hashes.iter().rev() { self.storage.remove(block_hash)?; } Ok(()) - })?; + }) + } - // Update the block tree. - *tree = updated_tree; - // Return success. - Ok(()) + /// Removes the last 'n' blocks from storage. + pub fn remove_last_n(&self, n: u32) -> Result<()> { + // Ensure 'n' is non-zero. + ensure!(n > 0, "Cannot remove zero blocks"); + + // Update the block store. + self.remove_last_n_from_store(n)?; + + // Update the Merkle tree. + self.remove_last_n_from_tree(n) } /// Returns the transaction store. diff --git a/synthesizer/src/vm/mod.rs b/synthesizer/src/vm/mod.rs index 4e6d1d23e1..93b27ae3b2 100644 --- a/synthesizer/src/vm/mod.rs +++ b/synthesizer/src/vm/mod.rs @@ -369,7 +369,15 @@ impl> VM { assert!(!self.block_store().flip_atomic_override()?); } // Rollback the block. - self.block_store().remove_last_n(1).map_err(|removal_error| { + #[cfg(not(feature = "rocks"))] + self.block_store().remove_last_n_from_store(1).map_err(|removal_error| { + // Log the finalize error. + error!("Failed to finalize block {} - {finalize_error}", block.height()); + // Return the removal error. + removal_error + })?; + // Rollback the Merkle tree. + self.block_store().remove_last_n_from_tree(1).map_err(|removal_error| { // Log the finalize error. error!("Failed to finalize block {} - {finalize_error}", block.height()); // Return the removal error. From f656d9f26206391869d4775c23d9ef43e7ef3870 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Wed, 21 Feb 2024 12:23:54 +0100 Subject: [PATCH 09/10] refactor: split flip_atomic_override into (un)pause_atomic_writes Signed-off-by: ljedrz --- Cargo.toml | 2 +- ledger/store/src/block/mod.rs | 30 +++++--- .../store/src/helpers/memory/internal/map.rs | 25 ++++--- .../store/src/helpers/rocksdb/internal/map.rs | 30 ++++---- .../store/src/helpers/rocksdb/internal/mod.rs | 72 +++++++++++-------- .../helpers/rocksdb/internal/nested_map.rs | 9 ++- ledger/store/src/helpers/traits/map.rs | 19 ++--- synthesizer/Cargo.toml | 1 + synthesizer/src/vm/mod.rs | 18 ++--- 9 files changed, 121 insertions(+), 85 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4726092fa2..adf37e8061 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,7 +132,7 @@ async = [ "snarkvm-ledger/async", "snarkvm-synthesizer/async" ] cuda = [ "snarkvm-algorithms/cuda" ] parameters_no_std_out = [ "snarkvm-parameters/no_std_out" ] noconfig = [ ] -rocks = [ "snarkvm-ledger/rocks" ] +rocks = [ "snarkvm-ledger/rocks", "snarkvm-synthesizer/rocks" ] test = [ "snarkvm-ledger/test" ] test-helpers = [ "snarkvm-ledger/test-helpers" ] timer = [ "snarkvm-ledger/timer" ] diff --git a/ledger/store/src/block/mod.rs b/ledger/store/src/block/mod.rs index 2dea8cae73..0fc27e455c 100644 --- a/ledger/store/src/block/mod.rs +++ b/ledger/store/src/block/mod.rs @@ -393,11 +393,18 @@ pub trait BlockStorage: 'static + Clone + Send + Sync { self.transaction_store().finish_atomic() } - /// Either enables or disables the atomic override. - fn flip_atomic_override(&self) -> Result { - // Since the atomic override applies to the entire storage, any map - // can be used to toggle it. - self.state_root_map().flip_atomic_override() + /// Pauses atomic writes. + fn pause_atomic_writes(&self) -> Result<()> { + // Since this applies to the entire storage, any map can be used; this + // one is just the first one in the list. + self.state_root_map().pause_atomic_writes() + } + + /// Unpauses atomic writes. + fn unpause_atomic_writes(&self) -> Result<()> { + // Since this applies to the entire storage, any map can be used; this + // one is just the first one in the list. + self.state_root_map().unpause_atomic_writes() } /// Stores the given `(state root, block)` pair into storage. @@ -1166,11 +1173,14 @@ impl> BlockStore { self.storage.storage_mode() } - /// Either enables or disables the atomic override. - pub fn flip_atomic_override(&self) -> Result { - // Since the atomic override applies to the entire storage, any store - // can be used to toggle it. - self.storage.flip_atomic_override() + /// Pauses atomic writes. + pub fn pause_atomic_writes(&self) -> Result<()> { + self.storage.pause_atomic_writes() + } + + /// Unpauses atomic writes. + pub fn unpause_atomic_writes(&self) -> Result<()> { + self.storage.unpause_atomic_writes() } } diff --git a/ledger/store/src/helpers/memory/internal/map.rs b/ledger/store/src/helpers/memory/internal/map.rs index 48adfd73f2..ebed4b84f8 100644 --- a/ledger/store/src/helpers/memory/internal/map.rs +++ b/ledger/store/src/helpers/memory/internal/map.rs @@ -230,15 +230,22 @@ impl< } /// - /// The atomic override can be used to merge disjoint atomic write batches. - /// When enabled, the subsequent atomic write batches no longer automatically - /// perform a write at the end of their scope; instead, they only extend the - /// pending write batch until `flip_atomic_override` is called again. - /// The returned boolean indicates the current state of the override (`true` - /// means it was enabled, `false` that it was disabled). - /// - fn flip_atomic_override(&self) -> Result { - Ok(false) + /// Once called, the subsequent atomic write batches will be queued instead of being executed + /// at the end of their scope. `unpause_atomic_writes` needs to be called in order to + /// restore the usual behavior. + /// + fn pause_atomic_writes(&self) -> Result<()> { + // No effect. + Ok(()) + } + + /// + /// Executes all of the queued writes as a single atomic operation and restores the usual + /// behavior of atomic write batches that was altered by calling `pause_atomic_writes`. + /// + fn unpause_atomic_writes(&self) -> Result<()> { + // No effect. + Ok(()) } } diff --git a/ledger/store/src/helpers/rocksdb/internal/map.rs b/ledger/store/src/helpers/rocksdb/internal/map.rs index 61dc6bc9d3..613d677316 100644 --- a/ledger/store/src/helpers/rocksdb/internal/map.rs +++ b/ledger/store/src/helpers/rocksdb/internal/map.rs @@ -108,8 +108,8 @@ impl< // Ensure that the atomic batch is empty. assert!(self.atomic_batch.lock().is_empty()); // Ensure that the database atomic batch is empty; skip this check if the atomic - // override is enabled, as there may be pending storage operations. - if !self.database.is_atomic_override_active() { + // writes are paused, as there may be pending operations. + if !self.database.are_atomic_writes_paused() { assert!(self.database.atomic_batch.lock().is_empty()); } } @@ -221,9 +221,8 @@ impl< // If we're at depth 0, it is the final call to `finish_atomic` and the // atomic write batch can be physically executed. This is skipped if the - // atomic override is in force, as the pending operations are executed - // when it is disabled. - if previous_atomic_depth == 1 && !self.database.is_atomic_override_active() { + // atomic writes are paused. + if previous_atomic_depth == 1 && !self.database.are_atomic_writes_paused() { // Empty the collection of pending operations. let batch = mem::take(&mut *self.database.atomic_batch.lock()); // Execute all the operations atomically. @@ -236,15 +235,20 @@ impl< } /// - /// The atomic override can be used to merge disjoint atomic write batches. - /// When enabled, the subsequent atomic write batches no longer automatically - /// perform a write at the end of their scope; instead, they only extend the - /// pending write batch until `flip_atomic_override` is called again. - /// The returned boolean indicates the current state of the override (`true` - /// means it was enabled, `false` that it was disabled). + /// Once called, the subsequent atomic write batches will be queued instead of being executed + /// at the end of their scope. `unpause_atomic_writes` needs to be called in order to + /// restore the usual behavior. /// - fn flip_atomic_override(&self) -> Result { - self.database.flip_atomic_override() + fn pause_atomic_writes(&self) -> Result<()> { + self.database.pause_atomic_writes() + } + + /// + /// Executes all of the queued writes as a single atomic operation and restores the usual + /// behavior of atomic write batches that was altered by calling `pause_atomic_writes`. + /// + fn unpause_atomic_writes(&self) -> Result<()> { + self.database.unpause_atomic_writes() } } diff --git a/ledger/store/src/helpers/rocksdb/internal/mod.rs b/ledger/store/src/helpers/rocksdb/internal/mod.rs index 56d1a50a2b..c33fb0ec9f 100644 --- a/ledger/store/src/helpers/rocksdb/internal/mod.rs +++ b/ledger/store/src/helpers/rocksdb/internal/mod.rs @@ -89,8 +89,8 @@ pub struct RocksDB { /// The depth of the current atomic write batch; it gets incremented with every call /// to `start_atomic` and decremented with each call to `finish_atomic`. pub(super) atomic_depth: Arc, - /// A flag indicating whether the atomic override is currently in effect. - pub(super) atomic_override: Arc, + /// A flag indicating whether the atomic writes are currently paused. + pub(super) atomic_writes_paused: Arc, } impl Deref for RocksDB { @@ -135,7 +135,7 @@ impl Database for RocksDB { storage_mode: storage.clone().into(), atomic_batch: Default::default(), atomic_depth: Default::default(), - atomic_override: Default::default(), + atomic_writes_paused: Default::default(), }) })? .clone(); @@ -206,39 +206,51 @@ impl Database for RocksDB { } impl RocksDB { - /// Toggles the atomic override; if it becomes disabled, the pending - /// operations get executed. - fn flip_atomic_override(&self) -> Result { - // The override is only intended to be toggled before or after + /// Pause the execution of atomic writes for the entire database. + fn pause_atomic_writes(&self) -> Result<()> { + // This operation is only intended to be performed before or after // atomic batches - never in the middle of them. assert_eq!(self.atomic_depth.load(Ordering::SeqCst), 0); - // https://github.com/rust-lang/rust/issues/98485 - let previous_value = self.atomic_override.load(Ordering::SeqCst); - - // A flip from enabled to disabled executes all pending operations - // and makes the storage function in the usual fashion again. - if previous_value { - let batch = mem::take(&mut *self.atomic_batch.lock()); - // In order to ensure that all the operations that are intended - // to be atomic via the usual macro approach are still performed - // atomically (just as a part of a larger batch), every atomic - // storage operation that has accumulated from the moment the - // override was enabled becomes executed as a single atomic batch - // when the override is disabled (i.e. `previous_value == true`). - self.rocksdb.write(batch)?; - } + // Set the flag indicating that the pause is in effect. + let already_paused = self.atomic_writes_paused.swap(true, Ordering::SeqCst); + // Make sure that we haven't already paused atomic writes (which would + // indicate a logic bug). + assert!(!already_paused); - // Flip the flag. - self.atomic_override.store(!previous_value, Ordering::SeqCst); + Ok(()) + } - // Return the current value of the flag. - Ok(!previous_value) + /// Unpause the execution of atomic writes for the entire database; this + /// executes all the writes that have been queued since they were paused. + fn unpause_atomic_writes(&self) -> Result<()> { + // This operation is only intended to be performed before or after + // atomic batches - never in the middle of them. + assert_eq!(self.atomic_depth.load(Ordering::SeqCst), 0); + + // https://github.com/rust-lang/rust/issues/98485 + let currently_paused = self.atomic_writes_paused.load(Ordering::SeqCst); + // Make sure we are currently paused (otherwise there is likely some + // logic bug involved. + assert!(currently_paused); + + // In order to ensure that all the operations that are intended + // to be atomic via the usual macro approach are still performed + // atomically (just as a part of a larger batch), every atomic + // storage operation that has accumulated from the moment the + // writes have been paused becomes executed as a single atomic batch. + let batch = mem::take(&mut *self.atomic_batch.lock()); + self.rocksdb.write(batch)?; + + // Unset the flag indicating that the pause is in effect. + self.atomic_writes_paused.store(false, Ordering::SeqCst); + + Ok(()) } - /// Checks whether the atomic override is currently in force. - fn is_atomic_override_active(&self) -> bool { - self.atomic_override.load(Ordering::SeqCst) + /// Checks whether the atomic writes are currently paused. + fn are_atomic_writes_paused(&self) -> bool { + self.atomic_writes_paused.load(Ordering::SeqCst) } /// Opens the test database. @@ -293,7 +305,7 @@ impl RocksDB { storage_mode: storage_mode.clone(), atomic_batch: Default::default(), atomic_depth: Default::default(), - atomic_override: Default::default(), + atomic_writes_paused: Default::default(), }) }?; diff --git a/ledger/store/src/helpers/rocksdb/internal/nested_map.rs b/ledger/store/src/helpers/rocksdb/internal/nested_map.rs index 770c568b01..86366628e9 100644 --- a/ledger/store/src/helpers/rocksdb/internal/nested_map.rs +++ b/ledger/store/src/helpers/rocksdb/internal/nested_map.rs @@ -202,8 +202,8 @@ impl< // Ensure that the atomic batch is empty. assert!(self.atomic_batch.lock().is_empty()); // Ensure that the database atomic batch is empty; skip this check if the atomic - // override is enabled, as there may be pending storage operations. - if !self.database.is_atomic_override_active() { + // writes are paused, as there may be pending operations. + if !self.database.are_atomic_writes_paused() { assert!(self.database.atomic_batch.lock().is_empty()); } } @@ -327,9 +327,8 @@ impl< // If we're at depth 0, it is the final call to `finish_atomic` and the // atomic write batch can be physically executed. This is skipped if the - // atomic override is in force, as the pending operations are executed - // when it is disabled. - if previous_atomic_depth == 1 && !self.database.is_atomic_override_active() { + // atomic writes are paused. + if previous_atomic_depth == 1 && !self.database.are_atomic_writes_paused() { // Empty the collection of pending operations. let batch = mem::take(&mut *self.database.atomic_batch.lock()); // Execute all the operations atomically. diff --git a/ledger/store/src/helpers/traits/map.rs b/ledger/store/src/helpers/traits/map.rs index a1bbd1876c..1e08c0c830 100644 --- a/ledger/store/src/helpers/traits/map.rs +++ b/ledger/store/src/helpers/traits/map.rs @@ -75,14 +75,17 @@ pub trait Map< fn finish_atomic(&self) -> Result<()>; /// - /// The atomic override can be used to merge disjoint atomic write batches. - /// When enabled, the subsequent atomic write batches no longer automatically - /// perform a write at the end of their scope; instead, they only extend the - /// pending write batch until `flip_atomic_override` is called again. - /// The returned boolean indicates the current state of the override (`true` - /// means it was enabled, `false` that it was disabled). - /// - fn flip_atomic_override(&self) -> Result; + /// Once called, the subsequent atomic write batches will be queued instead of being executed + /// at the end of their scope. `unpause_atomic_writes` needs to be called in order to + /// restore the usual behavior. + /// + fn pause_atomic_writes(&self) -> Result<()>; + + /// + /// Executes all of the queued writes as a single atomic operation and restores the usual + /// behavior of atomic write batches that was altered by calling `pause_atomic_writes`. + /// + fn unpause_atomic_writes(&self) -> Result<()>; } /// A trait representing map-like storage operations with read-only capabilities. diff --git a/synthesizer/Cargo.toml b/synthesizer/Cargo.toml index 40f9fcc0d2..7e9b95e809 100644 --- a/synthesizer/Cargo.toml +++ b/synthesizer/Cargo.toml @@ -31,6 +31,7 @@ snark = [ "synthesizer-snark" ] aleo-cli = [ ] async = [ "ledger-query/async", "synthesizer-process/async" ] cuda = [ "algorithms/cuda" ] +rocks = [ "ledger-store/rocks" ] serial = [ "console/serial", "ledger-block/serial", diff --git a/synthesizer/src/vm/mod.rs b/synthesizer/src/vm/mod.rs index 93b27ae3b2..410a4f2a84 100644 --- a/synthesizer/src/vm/mod.rs +++ b/synthesizer/src/vm/mod.rs @@ -344,29 +344,29 @@ impl> VM { block.previous_hash(), )?; - // Enable the atomic batch override, so that both the insertion and finalization belong to a single batch. + // Pause the atomic writes, so that both the insertion and finalization belong to a single batch. #[cfg(feature = "rocks")] - assert!(self.block_store().flip_atomic_override()?); + self.block_store().pause_atomic_writes()?; // First, insert the block. self.block_store().insert(block)?; // Next, finalize the transactions. match self.finalize(state, block.ratifications(), block.solutions(), block.transactions()) { Ok(_ratified_finalize_operations) => { - // Disable the atomic batch override, executing it. + // Unpause the atomic writes, executing the ones queued from block insertion and finalization. #[cfg(feature = "rocks")] - assert!(!self.block_store().flip_atomic_override()?); + self.block_store().unpause_atomic_writes()?; Ok(()) } Err(finalize_error) => { #[cfg(feature = "rocks")] { - // Clear all pending atomic operations so that disabling the atomic batch override - // doesn't execute any storage operation that accumulated after it was enabled. - self.block_store().atomic_abort(); - self.finalize_store().atomic_abort(); + // Clear all pending atomic operations so that unpausing the atomic writes + // doesn't execute any of the queued storage operations. + self.block_store().abort_atomic(); + self.finalize_store().abort_atomic(); // Disable the atomic batch override. - assert!(!self.block_store().flip_atomic_override()?); + self.block_store().unpause_atomic_writes()?; } // Rollback the block. #[cfg(not(feature = "rocks"))] From dd685acb6bfc22319ffe5e7d56b2c8e6f7666f43 Mon Sep 17 00:00:00 2001 From: Howard Wu <9260812+howardwu@users.noreply.github.com> Date: Wed, 21 Feb 2024 15:14:16 -0800 Subject: [PATCH 10/10] Adds safety on unpause_atomic_writes, reduces tree locking on removals --- ledger/store/src/block/mod.rs | 45 ++++++++++--------- .../store/src/helpers/memory/internal/map.rs | 2 +- .../store/src/helpers/rocksdb/internal/map.rs | 4 +- .../store/src/helpers/rocksdb/internal/mod.rs | 21 +++++---- ledger/store/src/helpers/traits/map.rs | 2 +- synthesizer/src/vm/mod.rs | 38 ++++++++-------- 6 files changed, 59 insertions(+), 53 deletions(-) diff --git a/ledger/store/src/block/mod.rs b/ledger/store/src/block/mod.rs index 0fc27e455c..5ee1c5e467 100644 --- a/ledger/store/src/block/mod.rs +++ b/ledger/store/src/block/mod.rs @@ -401,10 +401,10 @@ pub trait BlockStorage: 'static + Clone + Send + Sync { } /// Unpauses atomic writes. - fn unpause_atomic_writes(&self) -> Result<()> { + fn unpause_atomic_writes(&self) -> Result<()> { // Since this applies to the entire storage, any map can be used; this // one is just the first one in the list. - self.state_root_map().unpause_atomic_writes() + self.state_root_map().unpause_atomic_writes::() } /// Stores the given `(state root, block)` pair into storage. @@ -1064,7 +1064,9 @@ impl> BlockStore { } /// Reverts the Merkle tree to its shape before the insertion of the last 'n' blocks. - pub fn remove_last_n_from_tree(&self, n: u32) -> Result<()> { + pub fn remove_last_n_from_tree_only(&self, n: u32) -> Result<()> { + // Ensure 'n' is non-zero. + ensure!(n > 0, "Cannot remove zero blocks"); // Acquire the write lock on the block tree. let mut tree = self.tree.write(); // Prepare an updated Merkle tree removing the last 'n' block hashes. @@ -1075,8 +1077,14 @@ impl> BlockStore { Ok(()) } - /// Reverts the block store to its shape before the insertion of the last 'n' blocks. - pub fn remove_last_n_from_store(&self, n: u32) -> Result<()> { + /// Removes the last 'n' blocks from storage. + pub fn remove_last_n(&self, n: u32) -> Result<()> { + // Ensure 'n' is non-zero. + ensure!(n > 0, "Cannot remove zero blocks"); + + // Acquire the write lock on the block tree. + let mut tree = self.tree.write(); + // Determine the block heights to remove. let heights = match self.storage.id_map().keys_confirmed().max() { Some(height) => { @@ -1087,13 +1095,12 @@ impl> BlockStore { .checked_sub(n - 1) .ok_or_else(|| anyhow!("Failed to remove last '{n}' blocks: block height underflow"))?; // Ensure the block height matches the number of leaves in the Merkle tree. - ensure!(end_height == u32::try_from(self.tree.read().number_of_leaves())? - 1, "Block height mismatch"); + ensure!(end_height == u32::try_from(tree.number_of_leaves())? - 1, "Block height mismatch"); // Output the block heights. start_height..=end_height } None => bail!("Failed to remove last '{n}' blocks: no blocks in storage"), }; - // Fetch the block hashes to remove. let hashes = cfg_into_iter!(heights) .map(|height| match self.storage.get_block_hash(height)? { @@ -1102,25 +1109,21 @@ impl> BlockStore { }) .collect::>>()?; + // Prepare an updated Merkle tree removing the last 'n' block hashes. + let updated_tree = tree.prepare_remove_last_n(usize::try_from(n)?)?; + atomic_batch_scope!(self, { // Remove the blocks, in descending order. for block_hash in hashes.iter().rev() { self.storage.remove(block_hash)?; } Ok(()) - }) - } + })?; - /// Removes the last 'n' blocks from storage. - pub fn remove_last_n(&self, n: u32) -> Result<()> { - // Ensure 'n' is non-zero. - ensure!(n > 0, "Cannot remove zero blocks"); - - // Update the block store. - self.remove_last_n_from_store(n)?; - - // Update the Merkle tree. - self.remove_last_n_from_tree(n) + // Update the block tree. + *tree = updated_tree; + // Return success. + Ok(()) } /// Returns the transaction store. @@ -1179,8 +1182,8 @@ impl> BlockStore { } /// Unpauses atomic writes. - pub fn unpause_atomic_writes(&self) -> Result<()> { - self.storage.unpause_atomic_writes() + pub fn unpause_atomic_writes(&self) -> Result<()> { + self.storage.unpause_atomic_writes::() } } diff --git a/ledger/store/src/helpers/memory/internal/map.rs b/ledger/store/src/helpers/memory/internal/map.rs index ebed4b84f8..514e4d551c 100644 --- a/ledger/store/src/helpers/memory/internal/map.rs +++ b/ledger/store/src/helpers/memory/internal/map.rs @@ -243,7 +243,7 @@ impl< /// Executes all of the queued writes as a single atomic operation and restores the usual /// behavior of atomic write batches that was altered by calling `pause_atomic_writes`. /// - fn unpause_atomic_writes(&self) -> Result<()> { + fn unpause_atomic_writes(&self) -> Result<()> { // No effect. Ok(()) } diff --git a/ledger/store/src/helpers/rocksdb/internal/map.rs b/ledger/store/src/helpers/rocksdb/internal/map.rs index 613d677316..bd868d4bc6 100644 --- a/ledger/store/src/helpers/rocksdb/internal/map.rs +++ b/ledger/store/src/helpers/rocksdb/internal/map.rs @@ -247,8 +247,8 @@ impl< /// Executes all of the queued writes as a single atomic operation and restores the usual /// behavior of atomic write batches that was altered by calling `pause_atomic_writes`. /// - fn unpause_atomic_writes(&self) -> Result<()> { - self.database.unpause_atomic_writes() + fn unpause_atomic_writes(&self) -> Result<()> { + self.database.unpause_atomic_writes::() } } diff --git a/ledger/store/src/helpers/rocksdb/internal/mod.rs b/ledger/store/src/helpers/rocksdb/internal/mod.rs index c33fb0ec9f..20222de928 100644 --- a/ledger/store/src/helpers/rocksdb/internal/mod.rs +++ b/ledger/store/src/helpers/rocksdb/internal/mod.rs @@ -25,7 +25,7 @@ pub use nested_map::*; mod tests; use aleo_std_storage::StorageMode; -use anyhow::{bail, Result}; +use anyhow::{bail, ensure, Result}; use once_cell::sync::OnceCell; use parking_lot::Mutex; use serde::{de::DeserializeOwned, Serialize}; @@ -223,16 +223,17 @@ impl RocksDB { /// Unpause the execution of atomic writes for the entire database; this /// executes all the writes that have been queued since they were paused. - fn unpause_atomic_writes(&self) -> Result<()> { - // This operation is only intended to be performed before or after - // atomic batches - never in the middle of them. - assert_eq!(self.atomic_depth.load(Ordering::SeqCst), 0); + fn unpause_atomic_writes(&self) -> Result<()> { + // Ensure the call to unpause is only performed before or after an atomic batch scope + // - and never in the middle of one (otherwise there is a fundamental logic bug). + // Note: In production, this `ensure` is a safety-critical invariant that never fails. + ensure!(self.atomic_depth.load(Ordering::SeqCst) == 0, "Atomic depth must be 0 to unpause atomic writes"); // https://github.com/rust-lang/rust/issues/98485 let currently_paused = self.atomic_writes_paused.load(Ordering::SeqCst); - // Make sure we are currently paused (otherwise there is likely some - // logic bug involved. - assert!(currently_paused); + // Ensure the database is paused (otherwise there is a fundamental logic bug). + // Note: In production, this `ensure` is a safety-critical invariant that never fails. + ensure!(currently_paused, "Atomic writes must be paused to unpause them"); // In order to ensure that all the operations that are intended // to be atomic via the usual macro approach are still performed @@ -240,7 +241,9 @@ impl RocksDB { // storage operation that has accumulated from the moment the // writes have been paused becomes executed as a single atomic batch. let batch = mem::take(&mut *self.atomic_batch.lock()); - self.rocksdb.write(batch)?; + if !DISCARD_BATCH { + self.rocksdb.write(batch)?; + } // Unset the flag indicating that the pause is in effect. self.atomic_writes_paused.store(false, Ordering::SeqCst); diff --git a/ledger/store/src/helpers/traits/map.rs b/ledger/store/src/helpers/traits/map.rs index 1e08c0c830..b0eef6a4ac 100644 --- a/ledger/store/src/helpers/traits/map.rs +++ b/ledger/store/src/helpers/traits/map.rs @@ -85,7 +85,7 @@ pub trait Map< /// Executes all of the queued writes as a single atomic operation and restores the usual /// behavior of atomic write batches that was altered by calling `pause_atomic_writes`. /// - fn unpause_atomic_writes(&self) -> Result<()>; + fn unpause_atomic_writes(&self) -> Result<()>; } /// A trait representing map-like storage operations with read-only capabilities. diff --git a/synthesizer/src/vm/mod.rs b/synthesizer/src/vm/mod.rs index 410a4f2a84..d7f7912c9a 100644 --- a/synthesizer/src/vm/mod.rs +++ b/synthesizer/src/vm/mod.rs @@ -355,34 +355,34 @@ impl> VM { Ok(_ratified_finalize_operations) => { // Unpause the atomic writes, executing the ones queued from block insertion and finalization. #[cfg(feature = "rocks")] - self.block_store().unpause_atomic_writes()?; + self.block_store().unpause_atomic_writes::()?; Ok(()) } Err(finalize_error) => { - #[cfg(feature = "rocks")] - { + if cfg!(feature = "rocks") { // Clear all pending atomic operations so that unpausing the atomic writes // doesn't execute any of the queued storage operations. self.block_store().abort_atomic(); self.finalize_store().abort_atomic(); // Disable the atomic batch override. - self.block_store().unpause_atomic_writes()?; + // Note: This call is guaranteed to succeed (without error), because `DISCARD_BATCH == true`. + self.block_store().unpause_atomic_writes::()?; + // Rollback the Merkle tree. + self.block_store().remove_last_n_from_tree_only(1).map_err(|removal_error| { + // Log the finalize error. + error!("Failed to finalize block {} - {finalize_error}", block.height()); + // Return the removal error. + removal_error + })?; + } else { + // Rollback the block. + self.block_store().remove_last_n(1).map_err(|removal_error| { + // Log the finalize error. + error!("Failed to finalize block {} - {finalize_error}", block.height()); + // Return the removal error. + removal_error + })?; } - // Rollback the block. - #[cfg(not(feature = "rocks"))] - self.block_store().remove_last_n_from_store(1).map_err(|removal_error| { - // Log the finalize error. - error!("Failed to finalize block {} - {finalize_error}", block.height()); - // Return the removal error. - removal_error - })?; - // Rollback the Merkle tree. - self.block_store().remove_last_n_from_tree(1).map_err(|removal_error| { - // Log the finalize error. - error!("Failed to finalize block {} - {finalize_error}", block.height()); - // Return the removal error. - removal_error - })?; // Return the finalize error. Err(finalize_error) }