Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache transmissions for faster processing #3395

Open
wants to merge 6 commits into
base: mainnet-staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions node/bft/storage-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ workspace = true
version = "2.1"
features = [ "serde", "rayon" ]

[dependencies.lru]
version = "0.12.1"
Copy link
Collaborator

@niklaslong niklaslong Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to change it but just as a note to reviewers, the latest version (which will be used by cargo when building) is 0.12.4.


[dependencies.parking_lot]
version = "0.12"
optional = true
Expand Down
110 changes: 70 additions & 40 deletions node/bft/storage-service/src/persistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use crate::StorageService;
use snarkvm::{
ledger::{
committee::Committee,
narwhal::{BatchHeader, Transmission, TransmissionID},
store::{
cow_to_cloned,
Expand All @@ -33,9 +34,12 @@ use snarkvm::{

use aleo_std::StorageMode;
use indexmap::{indexset, IndexSet};
use lru::LruCache;
use parking_lot::Mutex;
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
num::NonZeroUsize,
};
use tracing::error;

Expand All @@ -46,24 +50,40 @@ pub struct BFTPersistentStorage<N: Network> {
transmissions: DataMap<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>,
/// The map of `aborted transmission ID` to `certificate IDs` entries.
aborted_transmission_ids: DataMap<TransmissionID<N>, IndexSet<Field<N>>>,
/// The LRU cache for `transmission ID` to `(transmission, certificate IDs)` entries that are part of the persistent storage.
cache_transmissions: Mutex<LruCache<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>>,
/// The LRU cache for `aborted transmission ID` to `certificate IDs` entries that are part of the persistent storage.
cache_aborted_transmission_ids: Mutex<LruCache<TransmissionID<N>, IndexSet<Field<N>>>>,
}

impl<N: Network> BFTPersistentStorage<N> {
/// Initializes a new BFT persistent storage service.
pub fn open(storage_mode: StorageMode) -> Result<Self> {
let capacity = NonZeroUsize::new(
(Committee::<N>::MAX_COMMITTEE_SIZE as usize) * (BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH) * 2,
)
.unwrap();

Ok(Self {
transmissions: internal::RocksDB::open_map(N::ID, storage_mode.clone(), MapID::BFT(BFTMap::Transmissions))?,
aborted_transmission_ids: internal::RocksDB::open_map(
N::ID,
storage_mode,
MapID::BFT(BFTMap::AbortedTransmissionIDs),
)?,
cache_transmissions: Mutex::new(LruCache::new(capacity)),
cache_aborted_transmission_ids: Mutex::new(LruCache::new(capacity)),
})
}

/// Initializes a new BFT persistent storage service.
/// Initializes a new BFT persistent storage service for testing.
#[cfg(any(test, feature = "test"))]
pub fn open_testing(temp_dir: std::path::PathBuf, dev: Option<u16>) -> Result<Self> {
let capacity = NonZeroUsize::new(
(Committee::<N>::MAX_COMMITTEE_SIZE as usize) * (BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH) * 2,
)
.unwrap();

Ok(Self {
transmissions: internal::RocksDB::open_map_testing(
temp_dir.clone(),
Expand All @@ -75,6 +95,8 @@ impl<N: Network> BFTPersistentStorage<N> {
dev,
MapID::BFT(BFTMap::AbortedTransmissionIDs),
)?,
cache_transmissions: Mutex::new(LruCache::new(capacity)),
cache_aborted_transmission_ids: Mutex::new(LruCache::new(capacity)),
})
}
}
Expand All @@ -101,7 +123,12 @@ impl<N: Network> StorageService<N> for BFTPersistentStorage<N> {
/// Returns the transmission for the given `transmission ID`.
/// If the transmission ID does not exist in storage, `None` is returned.
fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
// Get the transmission.
// Try to get the transmission from the cache first.
if let Some((transmission, _)) = self.cache_transmissions.lock().get_mut(&transmission_id) {
return Some(transmission.clone());
}

// If not found in cache, check persistent storage.
match self.transmissions.get_confirmed(&transmission_id) {
Ok(Some(Cow::Owned((transmission, _)))) => Some(transmission),
Ok(Some(Cow::Borrowed((transmission, _)))) => Some(transmission.clone()),
Expand Down Expand Up @@ -152,73 +179,76 @@ impl<N: Network> StorageService<N> for BFTPersistentStorage<N> {
aborted_transmission_ids: HashSet<TransmissionID<N>>,
mut missing_transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
) {
// Inserts the following:
// - Inserts **only the missing** transmissions from storage.
// - Inserts the certificate ID into the corresponding set for **all** transmissions.
'outer: for transmission_id in transmission_ids {
// Retrieve the transmission entry.
// First, handle the non-aborted transmissions.
for transmission_id in transmission_ids {
// Try to fetch from the persistent storage.
match self.transmissions.get_confirmed(&transmission_id) {
Ok(Some(entry)) => {
// The transmission exists in storage; update its certificate IDs.
let (transmission, mut certificate_ids) = cow_to_cloned!(entry);
// Insert the certificate ID into the set.
certificate_ids.insert(certificate_id);
// Update the transmission entry.
if let Err(e) = self.transmissions.insert(transmission_id, (transmission, certificate_ids)) {

// Update the persistent storage.
if let Err(e) =
self.transmissions.insert(transmission_id, (transmission.clone(), certificate_ids.clone()))
{
error!("Failed to insert transmission {transmission_id} into storage - {e}");
continue 'outer;
}

// Also, update the cache.
self.cache_transmissions.lock().put(transmission_id, (transmission, certificate_ids));
}
Ok(None) => {
// Retrieve the missing transmission.
let Some(transmission) = missing_transmissions.remove(&transmission_id) else {
if !aborted_transmission_ids.contains(&transmission_id)
&& !self.contains_transmission(transmission_id)
vicsn marked this conversation as resolved.
Show resolved Hide resolved
// The transmission is missing from persistent storage.
// Check if it exists in the `missing_transmissions` map provided.
if let Some(transmission) = missing_transmissions.remove(&transmission_id) {
let certificate_ids = indexset! { certificate_id };

// Insert into persistent storage.
if let Err(e) =
self.transmissions.insert(transmission_id, (transmission.clone(), certificate_ids.clone()))
{
error!("Failed to provide a missing transmission {transmission_id}");
error!("Failed to insert transmission {transmission_id} into storage - {e}");
}
continue 'outer;
};
// Prepare the set of certificate IDs.
let certificate_ids = indexset! { certificate_id };
// Insert the transmission and a new set with the certificate ID.
if let Err(e) = self.transmissions.insert(transmission_id, (transmission, certificate_ids)) {
error!("Failed to insert transmission {transmission_id} into storage - {e}");
continue 'outer;

// Also, insert into the cache.
self.cache_transmissions.lock().put(transmission_id, (transmission, certificate_ids));
} else if !aborted_transmission_ids.contains(&transmission_id) {
// If the transmission is not found in either storage or the missing map,
// and it's not an aborted transmission, log an error.
error!("Failed to provide a missing transmission {transmission_id}");
}
}
Err(e) => {
// Handle any errors during the retrieval.
error!("Failed to process the 'insert' for transmission {transmission_id} into storage - {e}");
continue 'outer;
}
}
}
// Inserts the aborted transmission IDs.

// Next, handle the aborted transmission IDs.
for aborted_transmission_id in aborted_transmission_ids {
// Retrieve the transmission entry.
match self.aborted_transmission_ids.get_confirmed(&aborted_transmission_id) {
let certificate_ids = match self.aborted_transmission_ids.get_confirmed(&aborted_transmission_id) {
Ok(Some(entry)) => {
let mut certificate_ids = cow_to_cloned!(entry);
// Insert the certificate ID into the set.
certificate_ids.insert(certificate_id);
// Update the transmission entry.
if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids) {
error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}");
}
}
Ok(None) => {
// Prepare the set of certificate IDs.
let certificate_ids = indexset! { certificate_id };
// Insert the transmission and a new set with the certificate ID.
if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids) {
error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}");
}
certificate_ids
}
Ok(None) => indexset! { certificate_id },
Err(e) => {
error!(
"Failed to process the 'insert' for aborted transmission ID {aborted_transmission_id} into storage - {e}"
);
continue;
}
};
// Insert the certificate IDs into the persistent storage.
if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids.clone()) {
error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}");
}
// Insert the certificate IDs into the cache.
self.cache_aborted_transmission_ids.lock().put(aborted_transmission_id, certificate_ids);
}
}

Expand Down