Skip to content

Commit

Permalink
only open narwhal store when on committee and garbage collect old epo…
Browse files Browse the repository at this point in the history
…ch dbs
  • Loading branch information
daltoncoder committed May 16, 2024
1 parent 6310c91 commit 0d663c0
Showing 1 changed file with 84 additions and 11 deletions.
95 changes: 84 additions & 11 deletions core/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::collections::BTreeMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

Expand Down Expand Up @@ -124,18 +126,11 @@ impl<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static, NE: Emitter>
// Get current epoch information
let (committee, worker_cache, epoch, epoch_end) = self.get_epoch_info();

// Make or open store specific to current epoch
let mut store_path = self.store_path.to_path_buf();
store_path.push(format!("{epoch}"));
// TODO(dalton): This store takes an optional cache metrics struct that can give us metrics
// on hits/miss
let store = NodeStorage::reopen(store_path, None);

if committee
.authority_by_key(self.narwhal_args.primary_keypair.public())
.is_some()
{
self.run_narwhal(store, epoch_end, epoch, committee, worker_cache)
self.run_narwhal(epoch_end, epoch, committee, worker_cache)
.await
}
}
Expand Down Expand Up @@ -249,15 +244,14 @@ impl<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static, NE: Emitter>

async fn run_narwhal(
&mut self,
store: NodeStorage,
epoch_end: u64,
epoch: u64,
committee: Committee,
worker_cache: WorkerCache,
) {
info!("Node is on current committee, starting narwhal.");
// If you are on the committee start the timer to signal when your node thinks its ready
// to change epochs.

let store = self.get_narwhal_store_and_garbage_collect(epoch);

// Create the narwhal service
let service = NarwhalService::new(
Expand All @@ -271,6 +265,7 @@ impl<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static, NE: Emitter>

service.start(self.execution_state.clone()).await;

// start the timer to signal when your node thinks its ready to change epochs
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
Expand All @@ -290,6 +285,20 @@ impl<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static, NE: Emitter>
fn set_event_tx(&mut self, tx: tokio::sync::mpsc::Sender<Vec<Event>>) {
self.execution_state.set_event_tx(tx);
}

/// Creates or reopens a narwhal store specific to current epoch. Also garbage collects stores
/// that are older than 2 epochs
fn get_narwhal_store_and_garbage_collect(&self, current_epoch: u64) -> NodeStorage {
let mut store_path = self.store_path.to_path_buf();

// Delete any directories that are from more than 2 epochs back
garbage_collect_old_stores(&current_epoch, &store_path);

store_path.push(format!("{current_epoch}"));
// TODO(dalton): This store takes an optional cache metrics struct that can give us metrics
// on hits/miss
NodeStorage::reopen(store_path, None)
}
}

impl<C: Collection> Consensus<C> {
Expand Down Expand Up @@ -429,6 +438,31 @@ impl<C: Collection> Consensus<C> {
}
}

/// Delete any epoch directories that are more than 2 epochs old
/// We dont wnat to panic if this fail but we should print an error
fn garbage_collect_old_stores(current_epoch: &u64, store_location: &PathBuf) {
if let Ok(files) = fs::read_dir(store_location) {
for file in files.flatten() {
// Every narwhal db is store in this directory with the number of the epoch as its
// name
if let Some(epoch_num) = file
.file_name()
.into_string()
.ok()
.and_then(|s| s.parse::<u64>().ok())
{
if epoch_num < current_epoch - 2 {
if let Err(e) = fs::remove_dir_all(file.path()) {
error!("Unable to remove garbage collected Narwhal epoch: {e}");
}
}
}
}
} else {
error!("Unable to read narwhal store directory to garbage collect old databases");
}
}

#[derive(Debug, Serialize, Deserialize, Clone, IsVariant, From, TryInto)]
pub enum PubSubMsg {
Transactions(AuthenticStampedParcel),
Expand All @@ -437,3 +471,42 @@ pub enum PubSubMsg {
}

impl AutoImplSerde for PubSubMsg {}

#[test]
// Test to make sure we are deleting and keeping the write directories when running the
// garbage_collect_old_stores function
fn test_garbage_collecting_epochs() {
// Create a fake store directory and fill it with 10 directories to simulate 10 epoch
// directories
let store_path = std::path::Path::new("./TEST_GARBAGE_COLLECTION");
for i in 0..=10 {
let mut path = store_path.to_path_buf();
path.push(format!("{i}"));
path.push("directory1");
fs::create_dir_all(path.clone()).unwrap();
path.set_file_name("file1.txt");
fs::File::create(path.clone()).unwrap();
path.set_file_name("file2.txt");
fs::File::create(path.clone()).unwrap();
path.set_file_name("file3.txt");
fs::File::create(path).unwrap();
}

// garbage collect the directory
garbage_collect_old_stores(&10, &store_path.to_path_buf());

// ensure that that there is only 3 directories left. One for current epoch and for the previous
// 2 epochs
for i in 0..=10 {
let mut dir_path = store_path.to_path_buf();
dir_path.push(format!("{i}"));
if i < 8 {
assert!(!dir_path.exists());
} else {
assert!(dir_path.exists());
}
}

// cleanup
fs::remove_dir_all(store_path).expect("Failed to cleanup test_garbage_collection_epochs");
}

0 comments on commit 0d663c0

Please sign in to comment.