From 20d6110d7557ac2c021f5726390ced374424c9d7 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Thu, 26 Oct 2023 11:38:51 -0700 Subject: [PATCH] Make Db::new async, force use of new API (#323) Signed-off-by: Ron Kuris Signed-off-by: Ron Kuris Co-authored-by: Richard Pringle --- firewood/Cargo.toml | 2 +- firewood/benches/hashops.rs | 70 +++++---- firewood/examples/insert.rs | 14 +- firewood/src/db.rs | 56 ++++--- firewood/src/file.rs | 9 +- firewood/src/lib.rs | 2 +- firewood/src/storage/buffer.rs | 23 +-- firewood/src/v2/api.rs | 2 +- firewood/src/v2/db.rs | 1 + firewood/tests/db.rs | 275 +++++++++++++++++---------------- firewood/tests/v2api.rs | 14 +- fwdctl/Cargo.toml | 1 + fwdctl/src/create.rs | 10 +- fwdctl/src/delete.rs | 16 +- fwdctl/src/dump.rs | 16 +- fwdctl/src/get.rs | 31 ++-- fwdctl/src/insert.rs | 19 +-- fwdctl/src/main.rs | 17 +- fwdctl/src/root.rs | 15 +- 19 files changed, 307 insertions(+), 286 deletions(-) diff --git a/firewood/Cargo.toml b/firewood/Cargo.toml index 9efa74ddc..89db5ed8b 100644 --- a/firewood/Cargo.toml +++ b/firewood/Cargo.toml @@ -37,7 +37,7 @@ typed-builder = "0.18.0" bincode = "1.3.3" [dev-dependencies] -criterion = "0.5.1" +criterion = {version = "0.5.1", features = ["async_tokio"]} keccak-hasher = "0.15.3" rand = "0.8.5" triehash = "0.8.4" diff --git a/firewood/benches/hashops.rs b/firewood/benches/hashops.rs index bd69004ac..76771d943 100644 --- a/firewood/benches/hashops.rs +++ b/firewood/benches/hashops.rs @@ -5,9 +5,10 @@ use criterion::{criterion_group, criterion_main, profiler::Profiler, BatchSize, Criterion}; use firewood::{ - db::{BatchOp, Db, DbConfig}, + db::{BatchOp, DbConfig}, merkle::{Merkle, TrieHash, TRIE_HASH_LEN}, storage::WalConfig, + v2::api::{Db, Proposal}, }; use pprof::ProfilerGuard; use rand::{distributions::Alphanumeric, rngs::StdRng, Rng, SeedableRng}; @@ -17,7 +18,7 @@ use shale::{ disk_address::DiskAddress, CachedStore, ObjCache, Storable, StoredView, }; -use std::{fs::File, iter::repeat_with, ops::Deref, os::raw::c_int, path::Path}; +use std::{fs::File, iter::repeat_with, ops::Deref, os::raw::c_int, path::Path, sync::Arc}; const ZERO_HASH: TrieHash = TrieHash([0u8; TRIE_HASH_LEN]); @@ -134,36 +135,41 @@ fn bench_db(criterion: &mut Criterion) { .benchmark_group("Db") .sample_size(30) .bench_function("commit", |b| { - b.iter_batched( - || { - let cfg = - DbConfig::builder().wal(WalConfig::builder().max_revisions(10).build()); - - let batch_ops: Vec<_> = repeat_with(|| { - (&mut rng) - .sample_iter(&Alphanumeric) - .take(KEY_LEN) - .collect() - }) - .map(|key: Vec<_>| BatchOp::Put { - key, - value: vec![b'v'], - }) - .take(N) - .collect(); - let db_path = dbg!(std::env::temp_dir()); - let db_path = db_path.join("benchmark_db"); - - let db = Db::new(db_path, &cfg.clone().truncate(true).build()).unwrap(); - - (db, batch_ops) - }, - |(db, batch_ops)| { - let proposal = db.new_proposal(batch_ops).unwrap(); - proposal.commit_sync().unwrap(); - }, - BatchSize::SmallInput, - ); + b.to_async(tokio::runtime::Runtime::new().unwrap()) + .iter_batched( + || { + let batch_ops: Vec<_> = repeat_with(|| { + (&mut rng) + .sample_iter(&Alphanumeric) + .take(KEY_LEN) + .collect() + }) + .map(|key: Vec<_>| BatchOp::Put { + key, + value: vec![b'v'], + }) + .take(N) + .collect(); + batch_ops + }, + |batch_ops| async { + let db_path = dbg!(std::env::temp_dir()); + let db_path = db_path.join("benchmark_db"); + let cfg = + DbConfig::builder().wal(WalConfig::builder().max_revisions(10).build()); + + let db = + firewood::db::Db::new(db_path, &cfg.clone().truncate(true).build()) + .await + .unwrap(); + + Arc::new(db.propose(batch_ops).await.unwrap()) + .commit() + .await + .unwrap() + }, + BatchSize::SmallInput, + ); }); } diff --git a/firewood/examples/insert.rs b/firewood/examples/insert.rs index 56dc7e021..985813b9b 100644 --- a/firewood/examples/insert.rs +++ b/firewood/examples/insert.rs @@ -9,7 +9,7 @@ use std::{collections::HashMap, error::Error, ops::RangeInclusive, sync::Arc, ti use firewood::{ db::{Batch, BatchOp, Db, DbConfig}, - v2::api::{Db as DbApi, DbView, Proposal}, + v2::api::{Db as _, DbView, Proposal}, }; use rand::{distributions::Alphanumeric, Rng}; @@ -44,11 +44,9 @@ async fn main() -> Result<(), Box> { let args = Args::parse(); - let db = tokio::task::spawn_blocking(move || { - Db::new("rev_db", &cfg).expect("db initiation should succeed") - }) - .await - .unwrap(); + let db = Db::new("rev_db", &cfg) + .await + .expect("db initiation should succeed"); let keys = args.batch_keys; let start = Instant::now(); @@ -74,7 +72,7 @@ async fn main() -> Result<(), Box> { let verify = get_keys_to_verify(&batch, args.read_verify_percent); - let proposal: Arc = db.propose(batch).await.unwrap().into(); + let proposal = Arc::new(db.propose(batch).await.unwrap()); proposal.commit().await?; verify_keys(&db, verify).await?; } @@ -107,7 +105,7 @@ fn get_keys_to_verify(batch: &Batch, Vec>, pct: u16) -> HashMap, Vec>, ) -> Result<(), firewood::v2::api::Error> { if !verify.is_empty() { diff --git a/firewood/src/db.rs b/firewood/src/db.rs index f5f324fe6..ea79ec383 100644 --- a/firewood/src/db.rs +++ b/firewood/src/db.rs @@ -4,6 +4,7 @@ pub use crate::{ config::{DbConfig, DbRevConfig}, storage::{buffer::DiskBufferConfig, WalConfig}, + v2::api::{Batch, BatchOp, Proposal}, }; use crate::{ file, @@ -18,7 +19,7 @@ use crate::{ }; use async_trait::async_trait; use bytemuck::{cast_slice, AnyBitPattern}; -use metered::{metered, HitCount}; +use metered::metered; use parking_lot::{Mutex, RwLock}; use shale::{ compact::{CompactSpace, CompactSpaceHeader}, @@ -42,8 +43,6 @@ use tokio::task::block_in_place; mod proposal; -pub use proposal::{Batch, BatchOp, Proposal}; - use self::proposal::ProposalBase; const MERKLE_META_SPACE: SpaceId = 0x0; @@ -278,8 +277,7 @@ pub struct DbRev { #[async_trait] impl + Send + Sync> api::DbView for DbRev { async fn root_hash(&self) -> Result { - self.merkle - .root_hash(self.header.kv_root) + block_in_place(|| self.merkle.root_hash(self.header.kv_root)) .map(|h| *h) .map_err(|e| api::Error::IO(std::io::Error::new(ErrorKind::Other, e))) } @@ -386,7 +384,7 @@ impl Drop for DbInner { impl api::Db for Db { type Historical = DbRev; - type Proposal = Proposal; + type Proposal = proposal::Proposal; async fn revision(&self, root_hash: HashKey) -> Result, api::Error> { let rev = block_in_place(|| self.get_revision(&TrieHash(root_hash))); @@ -400,7 +398,9 @@ impl api::Db for Db { } async fn root_hash(&self) -> Result { - self.kv_root_hash().map(|hash| hash.0).map_err(Into::into) + block_in_place(|| self.kv_root_hash()) + .map(|hash| hash.0) + .map_err(Into::into) } async fn propose( @@ -434,13 +434,17 @@ pub struct Db { impl Db { const PARAM_SIZE: u64 = size_of::() as u64; - /// Open a database. - pub fn new>(db_path: P, cfg: &DbConfig) -> Result { - // TODO: make sure all fds are released at the end + pub async fn new>(db_path: P, cfg: &DbConfig) -> Result { if cfg.truncate { - let _ = std::fs::remove_dir_all(db_path.as_ref()); + let _ = tokio::fs::remove_dir_all(db_path.as_ref()).await; } + block_in_place(|| Db::new_internal(db_path, cfg.clone())) + .map_err(|e| api::Error::InternalError(e.into())) + } + + /// Open a database. + fn new_internal>(db_path: P, cfg: DbConfig) -> Result { let open_options = if cfg.truncate { file::Options::Truncate } else { @@ -465,7 +469,7 @@ impl Db { { return Err(DbError::InvalidParams); } - Self::initialize_header_on_disk(cfg, fd0)?; + Self::initialize_header_on_disk(&cfg, fd0)?; } // read DbParams @@ -482,10 +486,12 @@ impl Db { let (sender, inbound) = tokio::sync::mpsc::unbounded_channel(); let disk_requester = DiskBufferRequester::new(sender); let buffer = cfg.buffer.clone(); - let disk_thread = Some(std::thread::spawn(move || { - let disk_buffer = DiskBuffer::new(inbound, &buffer, &wal).unwrap(); - disk_buffer.run() - })); + let disk_thread = block_in_place(|| { + Some(std::thread::spawn(move || { + let disk_buffer = DiskBuffer::new(inbound, &buffer, &wal).unwrap(); + disk_buffer.run() + })) + }); let root_hash_cache: Arc = CachedSpace::new( &StoreConfig::builder() @@ -753,10 +759,10 @@ impl Db { } /// Create a proposal. - pub fn new_proposal( + pub(crate) fn new_proposal( &self, data: Batch, - ) -> Result { + ) -> Result { let mut inner = self.inner.write(); let reset_store_headers = inner.reset_store_headers; let (store, mut rev) = Db::new_store( @@ -792,7 +798,7 @@ impl Db { rev.flush_dirty().unwrap(); let parent = ProposalBase::View(Arc::clone(&self.revisions.lock().base_revision)); - Ok(Proposal { + Ok(proposal::Proposal { m: Arc::clone(&self.inner), r: Arc::clone(&self.revisions), cfg: self.cfg.clone(), @@ -904,20 +910,10 @@ impl Db { self.revisions.lock().base_revision.kv_dump(w) } /// Get root hash of the latest generic key-value storage. - pub fn kv_root_hash(&self) -> Result { + pub(crate) fn kv_root_hash(&self) -> Result { self.revisions.lock().base_revision.kv_root_hash() } - /// Get a value in the kv store associated with a particular key. - #[measure(HitCount)] - pub fn kv_get>(&self, key: K) -> Result, DbError> { - self.revisions - .lock() - .base_revision - .kv_get(key) - .ok_or(DbError::KeyNotFound) - } - pub fn metrics(&self) -> Arc { self.metrics.clone() } diff --git a/firewood/src/file.rs b/firewood/src/file.rs index 96e93c9bd..84d81dff0 100644 --- a/firewood/src/file.rs +++ b/firewood/src/file.rs @@ -3,6 +3,7 @@ // Copied from CedrusDB +use std::fs::{create_dir, remove_dir_all}; use std::ops::Deref; use std::os::fd::OwnedFd; @@ -74,7 +75,7 @@ impl Deref for File { } } -pub fn touch_dir(dirname: &str, rootdir: &Path) -> Result { +pub(crate) fn touch_dir(dirname: &str, rootdir: &Path) -> Result { let path = rootdir.join(dirname); if let Err(e) = std::fs::create_dir(&path) { // ignore already-exists error @@ -85,17 +86,17 @@ pub fn touch_dir(dirname: &str, rootdir: &Path) -> Result>( +pub(crate) fn open_dir>( path: P, options: Options, ) -> Result<(PathBuf, bool), std::io::Error> { let truncate = options == Options::Truncate; if truncate { - let _ = std::fs::remove_dir_all(path.as_ref()); + let _ = remove_dir_all(path.as_ref()); } - match std::fs::create_dir(path.as_ref()) { + match create_dir(path.as_ref()) { Err(e) if truncate || e.kind() != ErrorKind::AlreadyExists => Err(e), // the DB already exists Err(_) => Ok((path.as_ref().to_path_buf(), false)), diff --git a/firewood/src/lib.rs b/firewood/src/lib.rs index 9c7b09548..e9da7c759 100644 --- a/firewood/src/lib.rs +++ b/firewood/src/lib.rs @@ -191,7 +191,7 @@ pub mod merkle_util; pub mod proof; pub mod storage; -pub(crate) mod config; +pub mod config; pub mod nibbles; pub mod v2; diff --git a/firewood/src/storage/buffer.rs b/firewood/src/storage/buffer.rs index 9951a4aa9..2b50e26e9 100644 --- a/firewood/src/storage/buffer.rs +++ b/firewood/src/storage/buffer.rs @@ -618,6 +618,7 @@ impl DiskBufferRequester { mod tests { use sha3::Digest; use std::path::{Path, PathBuf}; + use tokio::task::block_in_place; use super::*; use crate::{ @@ -657,9 +658,9 @@ mod tests { .into() } - #[test] + #[tokio::test] #[ignore = "ref: https://github.com/ava-labs/firewood/issues/45"] - fn test_buffer_with_undo() { + async fn test_buffer_with_undo() { let temp_dir = get_tmp_dir(); let buf_cfg = DiskBufferConfig::builder().build(); @@ -734,9 +735,9 @@ mod tests { assert_eq!(disk_requester.collect_ash(1).unwrap().len(), 1); } - #[test] + #[tokio::test] #[ignore = "ref: https://github.com/ava-labs/firewood/issues/45"] - fn test_buffer_with_redo() { + async fn test_buffer_with_redo() { let buf_cfg = DiskBufferConfig::builder().build(); let wal_cfg = WalConfig::builder().build(); let disk_requester = init_buffer(buf_cfg, wal_cfg); @@ -808,8 +809,8 @@ mod tests { assert_eq!(view.as_deref(), hash); } - #[test] - fn test_multi_stores() { + #[tokio::test(flavor = "multi_thread")] + async fn test_multi_stores() { let buf_cfg = DiskBufferConfig::builder().build(); let wal_cfg = WalConfig::builder().build(); let disk_requester = init_buffer(buf_cfg, wal_cfg); @@ -849,7 +850,7 @@ mod tests { // mutate the in memory buffer. let data = b"this is a test"; let hash: [u8; HASH_SIZE] = sha3::Keccak256::digest(data).into(); - store.write(0, &hash); + block_in_place(|| store.write(0, &hash)); assert_eq!(store.id(), STATE_SPACE); let another_data = b"this is another test"; @@ -857,11 +858,13 @@ mod tests { // mutate the in memory buffer in another StoreRev new from the above. let mut another_store = StoreRevMut::new_from_other(&store); - another_store.write(32, &another_hash); + block_in_place(|| another_store.write(32, &another_hash)); assert_eq!(another_store.id(), STATE_SPACE); // wal should have no records. - assert!(disk_requester.collect_ash(1).unwrap().is_empty()); + assert!(block_in_place(|| disk_requester.collect_ash(1)) + .unwrap() + .is_empty()); // get RO view of the buffer from the beginning. Both stores should have the same view. let view = store.get_view(0, HASH_SIZE as u64).unwrap(); @@ -904,7 +907,7 @@ mod tests { let another_store = StoreRevMut::new(state_cache); let view = another_store.get_view(0, HASH_SIZE as u64).unwrap(); assert_eq!(view.as_deref(), another_hash); - disk_requester.shutdown(); + block_in_place(|| disk_requester.shutdown()); } fn get_file_path(path: &Path, file: &str, line: u32) -> PathBuf { diff --git a/firewood/src/v2/api.rs b/firewood/src/v2/api.rs index dbc9ff4e0..0ddacbe71 100644 --- a/firewood/src/v2/api.rs +++ b/firewood/src/v2/api.rs @@ -172,7 +172,7 @@ pub trait DbView { /// [DbView], which means you can fetch values from it or /// obtain proofs. #[async_trait] -pub trait Proposal: DbView { +pub trait Proposal: DbView + Send + Sync { type Proposal: DbView + Proposal; /// Commit this revision diff --git a/firewood/src/v2/db.rs b/firewood/src/v2/db.rs index a210d1302..edfbe1ee6 100644 --- a/firewood/src/v2/db.rs +++ b/firewood/src/v2/db.rs @@ -49,6 +49,7 @@ where T: api::DbView, T: Send + Sync, T: Default, + T: 'static, { type Historical = T; diff --git a/firewood/tests/db.rs b/firewood/tests/db.rs index e522fcac2..f8bab4f31 100644 --- a/firewood/tests/db.rs +++ b/firewood/tests/db.rs @@ -2,47 +2,24 @@ // See the file LICENSE.md for licensing terms. use firewood::{ - db::{BatchOp, Db as PersistedDb, DbConfig, DbError, WalConfig}, - merkle::TrieHash, + db::{Db, DbConfig, WalConfig}, + v2::api::{self, BatchOp, Db as _, DbView, Proposal}, }; +use tokio::task::block_in_place; -use std::{ - collections::VecDeque, - fs::remove_dir_all, - ops::{Deref, DerefMut}, - path::Path, - sync::Arc, -}; +use std::{collections::VecDeque, env::temp_dir, path::PathBuf, sync::Arc}; // TODO: use a trait macro_rules! kv_dump { ($e: ident) => {{ let mut s = Vec::new(); - $e.kv_root_hash().unwrap(); $e.kv_dump(&mut s).unwrap(); String::from_utf8(s).unwrap() }}; } -struct Db<'a, P: AsRef + ?Sized>(PersistedDb, &'a P); - -impl<'a, P: AsRef + ?Sized> Db<'a, P> { - fn new(path: &'a P, cfg: &DbConfig) -> Result { - PersistedDb::new(path, cfg).map(|db| Self(db, path)) - } -} - -impl + ?Sized> Drop for Db<'_, P> { - fn drop(&mut self) { - // if you're using absolute paths, you have to clean up after yourself - if self.1.as_ref().is_relative() { - remove_dir_all(self.1).expect("should be able to remove db-directory"); - } - } -} - -#[test] -fn test_basic_metrics() { +#[tokio::test(flavor = "multi_thread")] +async fn test_basic_metrics() { let cfg = DbConfig::builder() .meta_ncached_pages(1024) .meta_ncached_files(128) @@ -57,15 +34,40 @@ fn test_basic_metrics() { .max_revisions(10) .build(), ); - let db = Db::new("test_revisions_db2", &cfg.truncate(true).build()).unwrap(); - let metrics = db.metrics(); - assert_eq!(metrics.kv_get.hit_count.get(), 0); - db.kv_get("a").ok(); - assert_eq!(metrics.kv_get.hit_count.get(), 1); + + let mut tmpdir: PathBuf = std::env::var_os("CARGO_TARGET_DIR") + .unwrap_or(temp_dir().into()) + .into(); + tmpdir.push("/tmp/test_basic_metrics"); + + let db = firewood::db::Db::new(tmpdir, &cfg.truncate(true).build()) + .await + .unwrap(); + // let metrics = db.metrics(); + // TODO: kv_get is no longer a valid metric, and DbRev has no access to Db.metrics (yet) + //assert_eq!(metrics.kv_get.hit_count.get(), 0); + + // TODO: we can't fetch the revision for the empty tree, so insert a single value + Arc::new( + db.propose(vec![BatchOp::Put { + key: b"a", + value: b"b", + }]) + .await + .unwrap(), + ) + .commit() + .await + .unwrap(); + + let root = db.root_hash().await.unwrap(); + let rev = db.revision(root).await.unwrap(); + rev.val("a").await.ok().unwrap().unwrap(); + //assert_eq!(metrics.val.hit_count.get(), 1); } -#[test] -fn test_revisions() { +#[tokio::test(flavor = "multi_thread")] +async fn test_revisions() { use rand::{rngs::StdRng, Rng, SeedableRng}; let cfg = DbConfig::builder() .meta_ncached_pages(1024) @@ -99,11 +101,18 @@ fn test_revisions() { .collect(); key }; + + let mut tmpdir: PathBuf = std::env::var_os("CARGO_TARGET_DIR") + .unwrap_or(temp_dir().into()) + .into(); + tmpdir.push("/tmp/test_revisions"); + for i in 0..10 { - let db = - PersistedDb::new("test_revisions_db", &cfg.clone().truncate(true).build()).unwrap(); + let db = Db::new(&tmpdir, &cfg.clone().truncate(true).build()) + .await + .unwrap(); let mut dumped = VecDeque::new(); - let mut hashes: VecDeque = VecDeque::new(); + let mut hashes: VecDeque = VecDeque::new(); for _ in 0..10 { { let mut batch = Vec::new(); @@ -117,48 +126,40 @@ fn test_revisions() { }; batch.push(write); } - let proposal = db.new_proposal(batch).unwrap(); - proposal.commit_sync().unwrap(); + let proposal = Arc::new(db.propose(batch).await.unwrap()); + proposal.commit().await.unwrap(); } while dumped.len() > 10 { dumped.pop_back(); hashes.pop_back(); } - let root_hash = db.kv_root_hash().unwrap(); + let root_hash = db.root_hash().await.unwrap(); hashes.push_front(root_hash); dumped.push_front(kv_dump!(db)); - dumped - .iter() - .zip(hashes.iter().cloned()) - .map(|(data, hash)| (data, db.get_revision(&hash).unwrap())) - .map(|(data, rev)| (data, kv_dump!(rev))) - .for_each(|(b, a)| { - if &a != b { - print!("{a}\n{b}"); - panic!("not the same"); - } - }); + for (dump, hash) in dumped.iter().zip(hashes.iter().cloned()) { + let rev = db.revision(hash).await.unwrap(); + assert_eq!(rev.root_hash().await.unwrap(), hash); + assert_eq!(kv_dump!(rev), *dump, "not the same: Pass {i}"); + } } drop(db); - let db = Db::new("test_revisions_db", &cfg.clone().truncate(false).build()).unwrap(); - dumped - .iter() - .zip(hashes.iter().cloned()) - .map(|(data, hash)| (data, db.get_revision(&hash).unwrap())) - .map(|(data, rev)| (data, kv_dump!(rev))) - .for_each(|(previous_dump, after_reopen_dump)| { - if &after_reopen_dump != previous_dump { - panic!( - "not the same: pass {i}:\n{after_reopen_dump}\n--------\n{previous_dump}" - ); - } - }); - println!("i = {i}"); + let db = Db::new(tmpdir.clone(), &cfg.clone().truncate(false).build()) + .await + .unwrap(); + for (dump, hash) in dumped.iter().zip(hashes.iter().cloned()) { + let rev = db.revision(hash).await.unwrap(); + rev.root_hash().await.unwrap(); + assert_eq!( + *dump, + block_in_place(|| kv_dump!(rev)), + "not the same: pass {i}" + ); + } } } -#[test] -fn create_db_issue_proof() { +#[tokio::test(flavor = "multi_thread")] +async fn create_db_issue_proof() { let cfg = DbConfig::builder() .meta_ncached_pages(1024) .meta_ncached_files(128) @@ -174,7 +175,14 @@ fn create_db_issue_proof() { .build(), ); - let db = Db::new("test_db_proof", &cfg.truncate(true).build()).unwrap(); + let mut tmpdir: PathBuf = std::env::var_os("CARGO_TARGET_DIR") + .unwrap_or(temp_dir().into()) + .into(); + tmpdir.push("/tmp/test_db_proof"); + + let db = firewood::db::Db::new(tmpdir, &cfg.truncate(true).build()) + .await + .unwrap(); let items = vec![ ("d", "verb"), @@ -191,10 +199,10 @@ fn create_db_issue_proof() { }; batch.push(write); } - let proposal = db.new_proposal(batch).unwrap(); - proposal.commit_sync().unwrap(); + let proposal = Arc::new(db.propose(batch).await.unwrap()); + proposal.commit().await.unwrap(); - let root_hash = db.kv_root_hash().unwrap(); + let root_hash = db.root_hash().await.unwrap(); // Add second commit let mut batch = Vec::new(); @@ -205,16 +213,16 @@ fn create_db_issue_proof() { }; batch.push(write); } - let proposal = db.new_proposal(batch).unwrap(); - proposal.commit_sync().unwrap(); + let proposal = Arc::new(db.propose(batch).await.unwrap()); + proposal.commit().await.unwrap(); - let rev = db.get_revision(&root_hash).unwrap(); + let rev = db.revision(root_hash).await.unwrap(); let key = "doe".as_bytes(); - let root_hash = rev.kv_root_hash(); + let root_hash = rev.root_hash().await.unwrap(); - match rev.prove::<&[u8]>(key) { + match rev.single_key_proof(key).await { Ok(proof) => { - let verification = proof.verify_proof(key, *root_hash.unwrap()).unwrap(); + let verification = proof.unwrap().verify_proof(key, root_hash).unwrap(); assert!(verification.is_some()); } Err(e) => { @@ -224,38 +232,30 @@ fn create_db_issue_proof() { let missing_key = "dog".as_bytes(); // The proof for the missing key will return the path to the missing key - if let Err(e) = rev.prove(missing_key) { + if let Err(e) = rev.single_key_proof(missing_key).await { println!("Error: {}", e); // TODO do type assertion on error } } -impl + ?Sized> Deref for Db<'_, P> { - type Target = PersistedDb; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl + ?Sized> DerefMut for Db<'_, P> { - fn deref_mut(&mut self) -> &mut PersistedDb { - &mut self.0 - } -} - macro_rules! assert_val { ($rev: ident, $key:literal, $expected_val:literal) => { - let actual = $rev.kv_get($key.as_bytes()).unwrap(); + let actual = $rev.val($key.as_bytes()).await.unwrap().unwrap(); assert_eq!(actual, $expected_val.as_bytes().to_vec()); }; } -#[test] -fn db_proposal() -> Result<(), DbError> { +#[tokio::test(flavor = "multi_thread")] +async fn db_proposal() -> Result<(), api::Error> { let cfg = DbConfig::builder().wal(WalConfig::builder().max_revisions(10).build()); - let db = Db::new("test_db_proposal", &cfg.clone().truncate(true).build()) + let mut tmpdir: PathBuf = std::env::var_os("CARGO_TARGET_DIR") + .unwrap_or(temp_dir().into()) + .into(); + tmpdir.push("/tmp/test_db_proposal"); + + let db = firewood::db::Db::new(tmpdir, &cfg.clone().truncate(true).build()) + .await .expect("db initiation should succeed"); let batch = vec![ @@ -266,49 +266,52 @@ fn db_proposal() -> Result<(), DbError> { BatchOp::Delete { key: b"z" }, ]; - let proposal = Arc::new(db.new_proposal(batch)?); - let rev = proposal.get_revision(); - assert_val!(rev, "k", "v"); + let proposal = Arc::new(db.propose(batch).await?); + assert_val!(proposal, "k", "v"); let batch_2 = vec![BatchOp::Put { key: b"k2", value: "v2".as_bytes().to_vec(), }]; - let proposal_2 = proposal.clone().propose_sync(batch_2)?; - let rev = proposal_2.get_revision(); - assert_val!(rev, "k", "v"); - assert_val!(rev, "k2", "v2"); + let proposal_2 = Arc::new(proposal.clone().propose(batch_2).await?); + assert_val!(proposal_2, "k", "v"); + assert_val!(proposal_2, "k2", "v2"); - proposal.commit_sync()?; - proposal_2.commit_sync()?; + proposal.clone().commit().await?; + proposal_2.commit().await?; - std::thread::scope(|scope| { - scope.spawn(|| -> Result<(), DbError> { + let t1 = tokio::spawn({ + let proposal = proposal.clone(); + async move { let another_batch = vec![BatchOp::Put { - key: b"another_k", - value: "another_v".as_bytes().to_vec(), + key: b"another_k_t1", + value: "another_v_t1".as_bytes().to_vec(), }]; - let another_proposal = proposal.clone().propose_sync(another_batch)?; + let another_proposal = proposal.clone().propose(another_batch).await.unwrap(); let rev = another_proposal.get_revision(); assert_val!(rev, "k", "v"); - assert_val!(rev, "another_k", "another_v"); + assert_val!(rev, "another_k_t1", "another_v_t1"); // The proposal is invalid and cannot be committed - assert!(another_proposal.commit_sync().is_err()); - Ok(()) - }); - - scope.spawn(|| -> Result<(), DbError> { + assert!(Arc::new(another_proposal).commit().await.is_err()); + } + }); + let t2 = tokio::spawn({ + let proposal = proposal.clone(); + async move { let another_batch = vec![BatchOp::Put { - key: b"another_k_1", - value: "another_v_1".as_bytes().to_vec(), + key: b"another_k_t2", + value: "another_v_t2".as_bytes().to_vec(), }]; - let another_proposal = proposal.clone().propose_sync(another_batch)?; + let another_proposal = proposal.clone().propose(another_batch).await.unwrap(); let rev = another_proposal.get_revision(); assert_val!(rev, "k", "v"); - assert_val!(rev, "another_k_1", "another_v_1"); - Ok(()) - }); + assert_val!(rev, "another_k_t2", "another_v_t2"); + assert!(Arc::new(another_proposal).commit().await.is_err()); + } }); + let (first, second) = tokio::join!(t1, t2); + first.unwrap(); + second.unwrap(); // Recursive commit @@ -316,23 +319,21 @@ fn db_proposal() -> Result<(), DbError> { key: b"k3", value: "v3".as_bytes().to_vec(), }]; - let proposal = Arc::new(db.new_proposal(batch)?); - let rev = proposal.get_revision(); - assert_val!(rev, "k", "v"); - assert_val!(rev, "k2", "v2"); - assert_val!(rev, "k3", "v3"); + let proposal = Arc::new(db.propose(batch).await?); + assert_val!(proposal, "k", "v"); + assert_val!(proposal, "k2", "v2"); + assert_val!(proposal, "k3", "v3"); let batch_2 = vec![BatchOp::Put { key: b"k4", value: "v4".as_bytes().to_vec(), }]; - let proposal_2 = proposal.clone().propose_sync(batch_2)?; - let rev = proposal_2.get_revision(); - assert_val!(rev, "k", "v"); - assert_val!(rev, "k2", "v2"); - assert_val!(rev, "k3", "v3"); - assert_val!(rev, "k4", "v4"); - - proposal_2.commit_sync()?; + let proposal_2 = Arc::new(proposal.clone().propose(batch_2).await?); + assert_val!(proposal_2, "k", "v"); + assert_val!(proposal_2, "k2", "v2"); + assert_val!(proposal_2, "k3", "v3"); + assert_val!(proposal_2, "k4", "v4"); + + proposal_2.commit().await?; Ok(()) } diff --git a/firewood/tests/v2api.rs b/firewood/tests/v2api.rs index afd9a7493..98ca03096 100644 --- a/firewood/tests/v2api.rs +++ b/firewood/tests/v2api.rs @@ -1,15 +1,15 @@ // Copyright (C) 2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE.md for licensing terms. -use std::{error::Error, path::PathBuf, sync::Arc}; +use std::{path::PathBuf, sync::Arc}; use firewood::{ - db::{BatchOp, Db as PersistedDb, DbConfig, DbError}, + db::{BatchOp, Db as PersistedDb, DbConfig}, v2::api::{Db, DbView, Proposal}, }; #[tokio::test(flavor = "multi_thread")] -async fn smoke() -> Result<(), Box> { +async fn smoke() -> Result<(), Box> { let cfg = DbConfig::builder().truncate(true).build(); let db = Arc::new(testdb(cfg).await?); let empty_hash = db.root_hash().await?; @@ -18,7 +18,7 @@ async fn smoke() -> Result<(), Box> { // insert a single key/value let (key, value) = (b"smoke", b"test"); let batch_put = BatchOp::Put { key, value }; - let proposal: Arc = db.propose(vec![batch_put]).await?.into(); + let proposal = Arc::new(db.propose(vec![batch_put]).await?); proposal.commit().await?; // ensure the latest hash is different @@ -41,11 +41,9 @@ async fn smoke() -> Result<(), Box> { Ok(()) } -async fn testdb(cfg: DbConfig) -> Result { +async fn testdb(cfg: DbConfig) -> Result { let tmpdbpath = tmp_dir().join("testdb"); - tokio::task::spawn_blocking(move || PersistedDb::new(tmpdbpath, &cfg)) - .await - .unwrap() + PersistedDb::new(tmpdbpath, &cfg).await } fn tmp_dir() -> PathBuf { diff --git a/fwdctl/Cargo.toml b/fwdctl/Cargo.toml index dd01f83f5..433892546 100644 --- a/fwdctl/Cargo.toml +++ b/fwdctl/Cargo.toml @@ -9,6 +9,7 @@ clap = { version = "4.0.29", features = ["cargo", "derive"] } anyhow = "1.0.66" env_logger = "0.10.0" log = "0.4.17" +tokio = { version = "1.33.0", features = ["full"] } [dev-dependencies] assert_cmd = "2.0.7" diff --git a/fwdctl/src/create.rs b/fwdctl/src/create.rs index 10bb562d5..d52ed1127 100644 --- a/fwdctl/src/create.rs +++ b/fwdctl/src/create.rs @@ -1,9 +1,11 @@ // Copyright (C) 2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE.md for licensing terms. -use anyhow::{Error, Result}; use clap::{value_parser, Args}; -use firewood::db::{Db, DbConfig, DbRevConfig, DiskBufferConfig, WalConfig}; +use firewood::{ + db::{Db, DbConfig, DbRevConfig, DiskBufferConfig, WalConfig}, + v2::api, +}; use log; #[derive(Args)] @@ -277,11 +279,11 @@ pub fn initialize_db_config(opts: &Options) -> DbConfig { } } -pub fn run(opts: &Options) -> Result<()> { +pub async fn run(opts: &Options) -> Result<(), api::Error> { let db_config = initialize_db_config(opts); log::debug!("database configuration parameters: \n{:?}\n", db_config); - Db::new::<&str>(opts.name.as_ref(), &db_config).map_err(Error::msg)?; + Db::new(opts.name.clone(), &db_config).await?; println!("created firewood database in {:?}", opts.name); Ok(()) } diff --git a/fwdctl/src/delete.rs b/fwdctl/src/delete.rs index 0c2520cfa..50bba1b29 100644 --- a/fwdctl/src/delete.rs +++ b/fwdctl/src/delete.rs @@ -1,9 +1,13 @@ // Copyright (C) 2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE.md for licensing terms. -use anyhow::{Error, Result}; +use std::sync::Arc; + use clap::Args; -use firewood::db::{BatchOp, Db, DbConfig, WalConfig}; +use firewood::{ + db::{BatchOp, Db, DbConfig, WalConfig}, + v2::api::{self, Db as _, Proposal}, +}; use log; #[derive(Debug, Args)] @@ -23,19 +27,19 @@ pub struct Options { pub db: String, } -pub fn run(opts: &Options) -> Result<()> { +pub async fn run(opts: &Options) -> Result<(), api::Error> { log::debug!("deleting key {:?}", opts); let cfg = DbConfig::builder() .truncate(false) .wal(WalConfig::builder().max_revisions(10).build()); - let db = Db::new(opts.db.as_str(), &cfg.build()).map_err(Error::msg)?; + let db = Db::new(opts.db.clone(), &cfg.build()).await?; let batch: Vec> = vec![BatchOp::Delete { key: opts.key.clone(), }]; - let proposal = db.new_proposal(batch).map_err(Error::msg)?; - proposal.commit_sync().map_err(Error::msg)?; + let proposal = Arc::new(db.propose(batch).await?); + proposal.commit().await?; println!("key {} deleted successfully", opts.key); Ok(()) diff --git a/fwdctl/src/dump.rs b/fwdctl/src/dump.rs index e1d690b47..3a0bc2f32 100644 --- a/fwdctl/src/dump.rs +++ b/fwdctl/src/dump.rs @@ -1,10 +1,13 @@ // Copyright (C) 2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE.md for licensing terms. -use anyhow::{Error, Result}; use clap::Args; -use firewood::db::{Db, DbConfig, WalConfig}; +use firewood::{ + db::{Db, DbConfig, WalConfig}, + v2::api::{self}, +}; use log; +use tokio::task::block_in_place; #[derive(Debug, Args)] pub struct Options { @@ -18,13 +21,14 @@ pub struct Options { pub db: String, } -pub fn run(opts: &Options) -> Result<()> { +pub async fn run(opts: &Options) -> Result<(), api::Error> { log::debug!("dump database {:?}", opts); let cfg = DbConfig::builder() .truncate(false) .wal(WalConfig::builder().max_revisions(10).build()); - let db = Db::new(opts.db.as_str(), &cfg.build()).map_err(Error::msg)?; - db.kv_dump(&mut std::io::stdout().lock()) - .map_err(Error::msg) + let db = Db::new(opts.db.clone(), &cfg.build()).await?; + Ok(block_in_place(|| { + db.kv_dump(&mut std::io::stdout().lock()) + })?) } diff --git a/fwdctl/src/get.rs b/fwdctl/src/get.rs index ae314ad3d..6a637ac4d 100644 --- a/fwdctl/src/get.rs +++ b/fwdctl/src/get.rs @@ -1,9 +1,11 @@ // Copyright (C) 2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE.md for licensing terms. -use anyhow::{anyhow, bail, Error, Result}; use clap::Args; -use firewood::db::{Db, DbConfig, DbError, WalConfig}; +use firewood::{ + db::{Db, DbConfig, WalConfig}, + v2::api::{self, Db as _, DbView}, +}; use log; use std::str; @@ -24,27 +26,26 @@ pub struct Options { pub db: String, } -pub fn run(opts: &Options) -> Result<()> { +pub async fn run(opts: &Options) -> Result<(), api::Error> { log::debug!("get key value pair {:?}", opts); let cfg = DbConfig::builder() .truncate(false) .wal(WalConfig::builder().max_revisions(10).build()); - let db = Db::new(opts.db.as_str(), &cfg.build()).map_err(Error::msg)?; + let db = Db::new(opts.db.clone(), &cfg.build()).await?; - match db.kv_get(opts.key.as_bytes()) { - Ok(val) => { - let s = match str::from_utf8(&val) { - Ok(v) => v, - Err(e) => return Err(anyhow!("Invalid UTF-8 sequence: {}", e)), - }; + let rev = db.revision(db.root_hash().await?).await?; + + match rev.val(opts.key.as_bytes()).await { + Ok(Some(val)) => { + let s = String::from_utf8_lossy(val.as_ref()); println!("{:?}", s); - if val.is_empty() { - bail!("no value found for key"); - } Ok(()) } - Err(DbError::KeyNotFound) => bail!("key not found"), - Err(e) => bail!(e), + Ok(None) => { + eprintln!("Key '{}' not found", opts.key); + Ok(()) + } + Err(e) => Err(e), } } diff --git a/fwdctl/src/insert.rs b/fwdctl/src/insert.rs index 804557de1..84e6ef2b7 100644 --- a/fwdctl/src/insert.rs +++ b/fwdctl/src/insert.rs @@ -1,9 +1,13 @@ // Copyright (C) 2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE.md for licensing terms. -use anyhow::{anyhow, Error, Result}; +use std::sync::Arc; + use clap::Args; -use firewood::db::{BatchOp, Db, DbConfig, WalConfig}; +use firewood::{ + db::{BatchOp, Db, DbConfig, WalConfig}, + v2::api::{self, Db as _, Proposal}, +}; use log; #[derive(Debug, Args)] @@ -27,23 +31,20 @@ pub struct Options { pub db: String, } -pub fn run(opts: &Options) -> Result<()> { +pub async fn run(opts: &Options) -> Result<(), api::Error> { log::debug!("inserting key value pair {:?}", opts); let cfg = DbConfig::builder() .truncate(false) .wal(WalConfig::builder().max_revisions(10).build()); - let db = match Db::new(opts.db.as_str(), &cfg.build()) { - Ok(db) => db, - Err(_) => return Err(anyhow!("error opening database")), - }; + let db = Db::new(opts.db.clone(), &cfg.build()).await?; let batch: Vec, Vec>> = vec![BatchOp::Put { key: opts.key.clone().into(), value: opts.value.bytes().collect(), }]; - let proposal = db.new_proposal(batch).map_err(Error::msg)?; - proposal.commit_sync().map_err(Error::msg)?; + let proposal = Arc::new(db.propose(batch).await?); + proposal.commit().await?; println!("{}", opts.key); Ok(()) diff --git a/fwdctl/src/main.rs b/fwdctl/src/main.rs index 6b6e9cb26..fca9d6d87 100644 --- a/fwdctl/src/main.rs +++ b/fwdctl/src/main.rs @@ -1,8 +1,8 @@ // Copyright (C) 2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE.md for licensing terms. -use anyhow::Result; use clap::{Parser, Subcommand}; +use firewood::v2::api; pub mod create; pub mod delete; @@ -46,7 +46,8 @@ enum Commands { Dump(dump::Options), } -fn main() -> Result<()> { +#[tokio::main] +async fn main() -> Result<(), api::Error> { let cli = Cli::parse(); env_logger::init_from_env( @@ -55,11 +56,11 @@ fn main() -> Result<()> { ); match &cli.command { - Commands::Create(opts) => create::run(opts), - Commands::Insert(opts) => insert::run(opts), - Commands::Get(opts) => get::run(opts), - Commands::Delete(opts) => delete::run(opts), - Commands::Root(opts) => root::run(opts), - Commands::Dump(opts) => dump::run(opts), + Commands::Create(opts) => create::run(opts).await, + Commands::Insert(opts) => insert::run(opts).await, + Commands::Get(opts) => get::run(opts).await, + Commands::Delete(opts) => delete::run(opts).await, + Commands::Root(opts) => root::run(opts).await, + Commands::Dump(opts) => dump::run(opts).await, } } diff --git a/fwdctl/src/root.rs b/fwdctl/src/root.rs index 1d76ea6c4..119a6b463 100644 --- a/fwdctl/src/root.rs +++ b/fwdctl/src/root.rs @@ -1,9 +1,12 @@ // Copyright (C) 2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE.md for licensing terms. -use anyhow::{Error, Result}; use clap::Args; -use firewood::db::{Db, DbConfig, WalConfig}; +use firewood::v2::api::Db as _; +use firewood::{ + db::{Db, DbConfig, WalConfig}, + v2::api, +}; use log; use std::str; @@ -20,15 +23,15 @@ pub struct Options { pub db: String, } -pub fn run(opts: &Options) -> Result<()> { +pub async fn run(opts: &Options) -> Result<(), api::Error> { log::debug!("root hash {:?}", opts); let cfg = DbConfig::builder() .truncate(false) .wal(WalConfig::builder().max_revisions(10).build()); - let db = Db::new(opts.db.as_str(), &cfg.build()).map_err(Error::msg)?; + let db = Db::new(opts.db.clone(), &cfg.build()).await?; - let root = db.kv_root_hash().map_err(Error::msg)?; - println!("{:X?}", *root); + let root = db.root_hash().await?; + println!("{:X?}", root); Ok(()) }