Skip to content

Commit

Permalink
Make Db::new async, force use of new API (#323)
Browse files Browse the repository at this point in the history
Signed-off-by: Ron Kuris <[email protected]>
Signed-off-by: Ron Kuris <[email protected]>
Co-authored-by: Richard Pringle <[email protected]>
  • Loading branch information
rkuris and richardpringle authored Oct 26, 2023
1 parent 1ed24f3 commit 20d6110
Show file tree
Hide file tree
Showing 19 changed files with 307 additions and 286 deletions.
2 changes: 1 addition & 1 deletion firewood/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ typed-builder = "0.18.0"
bincode = "1.3.3"

[dev-dependencies]
criterion = "0.5.1"
criterion = {version = "0.5.1", features = ["async_tokio"]}
keccak-hasher = "0.15.3"
rand = "0.8.5"
triehash = "0.8.4"
Expand Down
70 changes: 38 additions & 32 deletions firewood/benches/hashops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

use criterion::{criterion_group, criterion_main, profiler::Profiler, BatchSize, Criterion};
use firewood::{
db::{BatchOp, Db, DbConfig},
db::{BatchOp, DbConfig},
merkle::{Merkle, TrieHash, TRIE_HASH_LEN},
storage::WalConfig,
v2::api::{Db, Proposal},
};
use pprof::ProfilerGuard;
use rand::{distributions::Alphanumeric, rngs::StdRng, Rng, SeedableRng};
Expand All @@ -17,7 +18,7 @@ use shale::{
disk_address::DiskAddress,
CachedStore, ObjCache, Storable, StoredView,
};
use std::{fs::File, iter::repeat_with, ops::Deref, os::raw::c_int, path::Path};
use std::{fs::File, iter::repeat_with, ops::Deref, os::raw::c_int, path::Path, sync::Arc};

const ZERO_HASH: TrieHash = TrieHash([0u8; TRIE_HASH_LEN]);

Expand Down Expand Up @@ -134,36 +135,41 @@ fn bench_db<const N: usize>(criterion: &mut Criterion) {
.benchmark_group("Db")
.sample_size(30)
.bench_function("commit", |b| {
b.iter_batched(
|| {
let cfg =
DbConfig::builder().wal(WalConfig::builder().max_revisions(10).build());

let batch_ops: Vec<_> = repeat_with(|| {
(&mut rng)
.sample_iter(&Alphanumeric)
.take(KEY_LEN)
.collect()
})
.map(|key: Vec<_>| BatchOp::Put {
key,
value: vec![b'v'],
})
.take(N)
.collect();
let db_path = dbg!(std::env::temp_dir());
let db_path = db_path.join("benchmark_db");

let db = Db::new(db_path, &cfg.clone().truncate(true).build()).unwrap();

(db, batch_ops)
},
|(db, batch_ops)| {
let proposal = db.new_proposal(batch_ops).unwrap();
proposal.commit_sync().unwrap();
},
BatchSize::SmallInput,
);
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter_batched(
|| {
let batch_ops: Vec<_> = repeat_with(|| {
(&mut rng)
.sample_iter(&Alphanumeric)
.take(KEY_LEN)
.collect()
})
.map(|key: Vec<_>| BatchOp::Put {
key,
value: vec![b'v'],
})
.take(N)
.collect();
batch_ops
},
|batch_ops| async {
let db_path = dbg!(std::env::temp_dir());
let db_path = db_path.join("benchmark_db");
let cfg =
DbConfig::builder().wal(WalConfig::builder().max_revisions(10).build());

let db =
firewood::db::Db::new(db_path, &cfg.clone().truncate(true).build())
.await
.unwrap();

Arc::new(db.propose(batch_ops).await.unwrap())
.commit()
.await
.unwrap()
},
BatchSize::SmallInput,
);
});
}

Expand Down
14 changes: 6 additions & 8 deletions firewood/examples/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{collections::HashMap, error::Error, ops::RangeInclusive, sync::Arc, ti

use firewood::{
db::{Batch, BatchOp, Db, DbConfig},
v2::api::{Db as DbApi, DbView, Proposal},
v2::api::{Db as _, DbView, Proposal},
};
use rand::{distributions::Alphanumeric, Rng};

Expand Down Expand Up @@ -44,11 +44,9 @@ async fn main() -> Result<(), Box<dyn Error>> {

let args = Args::parse();

let db = tokio::task::spawn_blocking(move || {
Db::new("rev_db", &cfg).expect("db initiation should succeed")
})
.await
.unwrap();
let db = Db::new("rev_db", &cfg)
.await
.expect("db initiation should succeed");

let keys = args.batch_keys;
let start = Instant::now();
Expand All @@ -74,7 +72,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

let verify = get_keys_to_verify(&batch, args.read_verify_percent);

let proposal: Arc<firewood::db::Proposal> = db.propose(batch).await.unwrap().into();
let proposal = Arc::new(db.propose(batch).await.unwrap());
proposal.commit().await?;
verify_keys(&db, verify).await?;
}
Expand Down Expand Up @@ -107,7 +105,7 @@ fn get_keys_to_verify(batch: &Batch<Vec<u8>, Vec<u8>>, pct: u16) -> HashMap<Vec<
}

async fn verify_keys(
db: &Db,
db: &impl firewood::v2::api::Db,
verify: HashMap<Vec<u8>, Vec<u8>>,
) -> Result<(), firewood::v2::api::Error> {
if !verify.is_empty() {
Expand Down
56 changes: 26 additions & 30 deletions firewood/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
pub use crate::{
config::{DbConfig, DbRevConfig},
storage::{buffer::DiskBufferConfig, WalConfig},
v2::api::{Batch, BatchOp, Proposal},
};
use crate::{
file,
Expand All @@ -18,7 +19,7 @@ use crate::{
};
use async_trait::async_trait;
use bytemuck::{cast_slice, AnyBitPattern};
use metered::{metered, HitCount};
use metered::metered;
use parking_lot::{Mutex, RwLock};
use shale::{
compact::{CompactSpace, CompactSpaceHeader},
Expand All @@ -42,8 +43,6 @@ use tokio::task::block_in_place;

mod proposal;

pub use proposal::{Batch, BatchOp, Proposal};

use self::proposal::ProposalBase;

const MERKLE_META_SPACE: SpaceId = 0x0;
Expand Down Expand Up @@ -278,8 +277,7 @@ pub struct DbRev<S> {
#[async_trait]
impl<S: ShaleStore<Node> + Send + Sync> api::DbView for DbRev<S> {
async fn root_hash(&self) -> Result<api::HashKey, api::Error> {
self.merkle
.root_hash(self.header.kv_root)
block_in_place(|| self.merkle.root_hash(self.header.kv_root))
.map(|h| *h)
.map_err(|e| api::Error::IO(std::io::Error::new(ErrorKind::Other, e)))
}
Expand Down Expand Up @@ -386,7 +384,7 @@ impl Drop for DbInner {
impl api::Db for Db {
type Historical = DbRev<SharedStore>;

type Proposal = Proposal;
type Proposal = proposal::Proposal;

async fn revision(&self, root_hash: HashKey) -> Result<Arc<Self::Historical>, api::Error> {
let rev = block_in_place(|| self.get_revision(&TrieHash(root_hash)));
Expand All @@ -400,7 +398,9 @@ impl api::Db for Db {
}

async fn root_hash(&self) -> Result<HashKey, api::Error> {
self.kv_root_hash().map(|hash| hash.0).map_err(Into::into)
block_in_place(|| self.kv_root_hash())
.map(|hash| hash.0)
.map_err(Into::into)
}

async fn propose<K: KeyType, V: ValueType>(
Expand Down Expand Up @@ -434,13 +434,17 @@ pub struct Db {
impl Db {
const PARAM_SIZE: u64 = size_of::<DbParams>() as u64;

/// Open a database.
pub fn new<P: AsRef<Path>>(db_path: P, cfg: &DbConfig) -> Result<Self, DbError> {
// TODO: make sure all fds are released at the end
pub async fn new<P: AsRef<Path>>(db_path: P, cfg: &DbConfig) -> Result<Self, api::Error> {
if cfg.truncate {
let _ = std::fs::remove_dir_all(db_path.as_ref());
let _ = tokio::fs::remove_dir_all(db_path.as_ref()).await;
}

block_in_place(|| Db::new_internal(db_path, cfg.clone()))
.map_err(|e| api::Error::InternalError(e.into()))
}

/// Open a database.
fn new_internal<P: AsRef<Path>>(db_path: P, cfg: DbConfig) -> Result<Self, DbError> {
let open_options = if cfg.truncate {
file::Options::Truncate
} else {
Expand All @@ -465,7 +469,7 @@ impl Db {
{
return Err(DbError::InvalidParams);
}
Self::initialize_header_on_disk(cfg, fd0)?;
Self::initialize_header_on_disk(&cfg, fd0)?;
}

// read DbParams
Expand All @@ -482,10 +486,12 @@ impl Db {
let (sender, inbound) = tokio::sync::mpsc::unbounded_channel();
let disk_requester = DiskBufferRequester::new(sender);
let buffer = cfg.buffer.clone();
let disk_thread = Some(std::thread::spawn(move || {
let disk_buffer = DiskBuffer::new(inbound, &buffer, &wal).unwrap();
disk_buffer.run()
}));
let disk_thread = block_in_place(|| {
Some(std::thread::spawn(move || {
let disk_buffer = DiskBuffer::new(inbound, &buffer, &wal).unwrap();
disk_buffer.run()
}))
});

let root_hash_cache: Arc<CachedSpace> = CachedSpace::new(
&StoreConfig::builder()
Expand Down Expand Up @@ -753,10 +759,10 @@ impl Db {
}

/// Create a proposal.
pub fn new_proposal<K: KeyType, V: ValueType>(
pub(crate) fn new_proposal<K: KeyType, V: ValueType>(
&self,
data: Batch<K, V>,
) -> Result<Proposal, DbError> {
) -> Result<proposal::Proposal, DbError> {
let mut inner = self.inner.write();
let reset_store_headers = inner.reset_store_headers;
let (store, mut rev) = Db::new_store(
Expand Down Expand Up @@ -792,7 +798,7 @@ impl Db {
rev.flush_dirty().unwrap();

let parent = ProposalBase::View(Arc::clone(&self.revisions.lock().base_revision));
Ok(Proposal {
Ok(proposal::Proposal {
m: Arc::clone(&self.inner),
r: Arc::clone(&self.revisions),
cfg: self.cfg.clone(),
Expand Down Expand Up @@ -904,20 +910,10 @@ impl Db {
self.revisions.lock().base_revision.kv_dump(w)
}
/// Get root hash of the latest generic key-value storage.
pub fn kv_root_hash(&self) -> Result<TrieHash, DbError> {
pub(crate) fn kv_root_hash(&self) -> Result<TrieHash, DbError> {
self.revisions.lock().base_revision.kv_root_hash()
}

/// Get a value in the kv store associated with a particular key.
#[measure(HitCount)]
pub fn kv_get<K: AsRef<[u8]>>(&self, key: K) -> Result<Vec<u8>, DbError> {
self.revisions
.lock()
.base_revision
.kv_get(key)
.ok_or(DbError::KeyNotFound)
}

pub fn metrics(&self) -> Arc<DbMetrics> {
self.metrics.clone()
}
Expand Down
9 changes: 5 additions & 4 deletions firewood/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

// Copied from CedrusDB

use std::fs::{create_dir, remove_dir_all};
use std::ops::Deref;
use std::os::fd::OwnedFd;

Expand Down Expand Up @@ -74,7 +75,7 @@ impl Deref for File {
}
}

pub fn touch_dir(dirname: &str, rootdir: &Path) -> Result<PathBuf, std::io::Error> {
pub(crate) fn touch_dir(dirname: &str, rootdir: &Path) -> Result<PathBuf, std::io::Error> {
let path = rootdir.join(dirname);
if let Err(e) = std::fs::create_dir(&path) {
// ignore already-exists error
Expand All @@ -85,17 +86,17 @@ pub fn touch_dir(dirname: &str, rootdir: &Path) -> Result<PathBuf, std::io::Erro
Ok(path)
}

pub fn open_dir<P: AsRef<Path>>(
pub(crate) fn open_dir<P: AsRef<Path>>(
path: P,
options: Options,
) -> Result<(PathBuf, bool), std::io::Error> {
let truncate = options == Options::Truncate;

if truncate {
let _ = std::fs::remove_dir_all(path.as_ref());
let _ = remove_dir_all(path.as_ref());
}

match std::fs::create_dir(path.as_ref()) {
match create_dir(path.as_ref()) {
Err(e) if truncate || e.kind() != ErrorKind::AlreadyExists => Err(e),
// the DB already exists
Err(_) => Ok((path.as_ref().to_path_buf(), false)),
Expand Down
2 changes: 1 addition & 1 deletion firewood/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pub mod merkle_util;
pub mod proof;
pub mod storage;

pub(crate) mod config;
pub mod config;
pub mod nibbles;

pub mod v2;
Loading

0 comments on commit 20d6110

Please sign in to comment.