From 6f29a0057ef7c5bcd94ed4ca914c2a95fd18a8a6 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Thu, 28 Sep 2023 16:10:34 -0700 Subject: [PATCH] 2/3 Implement v2::api for the real DB (#292) --- firewood/benches/hashops.rs | 2 +- firewood/examples/rev.rs | 10 ++--- firewood/src/db.rs | 48 ++++++++++++++++++---- firewood/src/db/proposal.rs | 82 +++++++++++++++++++++++++++++-------- firewood/src/v2/api.rs | 8 ++-- firewood/src/v2/db.rs | 23 +++++++++-- firewood/src/v2/emptydb.rs | 5 +-- firewood/src/v2/propose.rs | 4 +- firewood/tests/db.rs | 22 +++++----- fwdctl/src/delete.rs | 6 ++- fwdctl/src/insert.rs | 6 +-- 11 files changed, 158 insertions(+), 58 deletions(-) diff --git a/firewood/benches/hashops.rs b/firewood/benches/hashops.rs index bebac2ab1..bd69004ac 100644 --- a/firewood/benches/hashops.rs +++ b/firewood/benches/hashops.rs @@ -160,7 +160,7 @@ fn bench_db(criterion: &mut Criterion) { }, |(db, batch_ops)| { let proposal = db.new_proposal(batch_ops).unwrap(); - proposal.commit().unwrap(); + proposal.commit_sync().unwrap(); }, BatchSize::SmallInput, ); diff --git a/firewood/examples/rev.rs b/firewood/examples/rev.rs index ce4463b2d..aa0fd917c 100644 --- a/firewood/examples/rev.rs +++ b/firewood/examples/rev.rs @@ -7,7 +7,7 @@ use firewood::{ db::{BatchOp, Db, DbConfig, DbError, Proposal, Revision, WalConfig}, merkle::{Node, TrieHash}, storage::StoreRevShared, - v2::api::Proof, + v2::api::{KeyType, Proof, ValueType}, }; use shale::compact::CompactSpace; @@ -118,7 +118,7 @@ impl RevisionTracker { } } - fn create_revisions( + fn create_revisions( &mut self, mut iter: impl Iterator, ) -> Result<(), DbError> @@ -129,7 +129,7 @@ impl RevisionTracker { iter.try_for_each(|(k, v)| self.create_revision(k, v)) } - fn create_revision(&mut self, k: K, v: V) -> Result<(), DbError> + fn create_revision(&mut self, k: K, v: V) -> Result<(), DbError> where K: AsRef<[u8]>, V: AsRef<[u8]>, @@ -138,7 +138,7 @@ impl RevisionTracker { key: k, value: v.as_ref().to_vec(), }]; - self.db.new_proposal(batch)?.commit()?; + self.db.new_proposal(batch)?.commit_sync()?; let hash = self.db.kv_root_hash().expect("root-hash should exist"); self.hashes.push_front(hash); @@ -146,7 +146,7 @@ impl RevisionTracker { } fn commit_proposal(&mut self, proposal: Proposal) { - proposal.commit().unwrap(); + proposal.commit_sync().unwrap(); let hash = self.db.kv_root_hash().expect("root-hash should exist"); self.hashes.push_front(hash); } diff --git a/firewood/src/db.rs b/firewood/src/db.rs index cbab34d65..ea59c503e 100644 --- a/firewood/src/db.rs +++ b/firewood/src/db.rs @@ -14,7 +14,7 @@ use crate::{ CachedSpace, MemStoreR, SpaceWrite, StoreConfig, StoreDelta, StoreRevMut, StoreRevShared, ZeroStore, PAGE_SIZE_NBIT, }, - v2::api::{self, Proof}, + v2::api::{self, HashKey, KeyType, Proof, ValueType}, }; use async_trait::async_trait; use bytemuck::{cast_slice, AnyBitPattern}; @@ -293,11 +293,14 @@ impl + Send + Sync> api::DbView for DbRev { } } - async fn single_key_proof + Send>( + async fn single_key_proof( &self, - _key: K, - ) -> Result>, api::Error> { - todo!() + key: K, + ) -> Result>>, api::Error> { + self.merkle + .prove(key, self.header.kv_root) + .map(Some) + .map_err(|e| api::Error::IO(std::io::Error::new(ErrorKind::Other, e))) } async fn range_proof( @@ -380,6 +383,34 @@ impl Drop for DbInner { } } +#[async_trait] +impl api::Db for Db { + type Historical = DbRev; + + type Proposal = Proposal; + + async fn revision(&self, root_hash: HashKey) -> Result, api::Error> { + if let Some(rev) = self.get_revision(&TrieHash(root_hash)) { + Ok(Arc::new(rev.rev)) + } else { + Err(api::Error::HashNotFound { + provided: root_hash, + }) + } + } + + async fn root_hash(&self) -> Result { + self.kv_root_hash().map(|hash| hash.0).map_err(Into::into) + } + + async fn propose( + &self, + batch: api::Batch, + ) -> Result { + self.new_proposal(batch).map_err(Into::into) + } +} + #[derive(Debug)] pub struct DbRevInner { inner: VecDeque>, @@ -722,7 +753,10 @@ impl Db { } /// Create a proposal. - pub fn new_proposal>(&self, data: Batch) -> Result { + pub fn new_proposal( + &self, + data: Batch, + ) -> Result { let mut inner = self.inner.write(); let reset_store_headers = inner.reset_store_headers; let (store, mut rev) = Db::new_store( @@ -742,7 +776,7 @@ impl Db { BatchOp::Put { key, value } => { let (header, merkle) = rev.borrow_split(); merkle - .insert(key, value, header.kv_root) + .insert(key, value.as_ref().to_vec(), header.kv_root) .map_err(DbError::Merkle)?; Ok(()) } diff --git a/firewood/src/db/proposal.rs b/firewood/src/db/proposal.rs index befc1a6ad..c202e3c01 100644 --- a/firewood/src/db/proposal.rs +++ b/firewood/src/db/proposal.rs @@ -9,22 +9,14 @@ use super::{ use crate::{ merkle::{TrieHash, TRIE_HASH_LEN}, storage::{buffer::BufferWrite, AshRecord, StoreRevMut}, + v2::api::{self, KeyType, ValueType}, }; +use async_trait::async_trait; use parking_lot::{Mutex, RwLock}; use shale::CachedStore; -use std::sync::Arc; - -/// A key/value pair operation. Only put (upsert) and delete are -/// supported -#[derive(Debug)] -pub enum BatchOp { - Put { key: K, value: Vec }, - Delete { key: K }, -} +use std::{io::ErrorKind, sync::Arc}; -/// A list of operations to consist of a batch that -/// can be proposed -pub type Batch = Vec>; +pub use crate::v2::api::{Batch, BatchOp}; /// An atomic batch of changes proposed against the latest committed revision, /// or any existing [Proposal]. Multiple proposals can be created against the @@ -50,10 +42,29 @@ pub enum ProposalBase { View(Arc>), } +#[async_trait] +impl crate::v2::api::Proposal for Proposal { + type Proposal = Proposal; + + async fn commit(self: Arc) -> Result, api::Error> { + todo!() + } + + async fn propose( + self: Arc, + data: api::Batch, + ) -> Result { + self.propose_sync(data).map_err(Into::into) + } +} + impl Proposal { // Propose a new proposal from this proposal. The new proposal will be // the child of it. - pub fn propose>(self: Arc, data: Batch) -> Result { + pub fn propose_sync( + self: Arc, + data: Batch, + ) -> Result { let store = self.store.new_from_other(); let m = Arc::clone(&self.m); @@ -81,7 +92,7 @@ impl Proposal { BatchOp::Put { key, value } => { let (header, merkle) = rev.borrow_split(); merkle - .insert(key, value, header.kv_root) + .insert(key, value.as_ref().to_vec(), header.kv_root) .map_err(DbError::Merkle)?; Ok(()) } @@ -111,14 +122,14 @@ impl Proposal { /// Persist all changes to the DB. The atomicity of the [Proposal] guarantees all changes are /// either retained on disk or lost together during a crash. - pub fn commit(&self) -> Result<(), DbError> { + pub fn commit_sync(&self) -> Result<(), DbError> { let mut committed = self.committed.lock(); if *committed { return Ok(()); } if let ProposalBase::Proposal(p) = &self.parent { - p.commit()?; + p.commit_sync()?; }; // Check for if it can be committed @@ -257,6 +268,45 @@ impl Proposal { } } +#[async_trait] +impl api::DbView for Proposal { + async fn root_hash(&self) -> Result { + self.get_revision() + .kv_root_hash() + .map(|hash| hash.0) + .map_err(|e| api::Error::IO(std::io::Error::new(ErrorKind::Other, e))) + } + async fn val(&self, key: K) -> Result>, api::Error> + where + K: api::KeyType, + { + // TODO: pass back errors from kv_get + Ok(self.get_revision().kv_get(key)) + } + + async fn single_key_proof(&self, key: K) -> Result>>, api::Error> + where + K: api::KeyType, + { + self.get_revision() + .prove(key) + .map(Some) + .map_err(|e| api::Error::IO(std::io::Error::new(ErrorKind::Other, e))) + } + + async fn range_proof( + &self, + _first_key: Option, + _last_key: Option, + _limit: usize, + ) -> Result>, api::Error> + where + K: api::KeyType, + { + todo!() + } +} + impl Drop for Proposal { fn drop(&mut self) { if !*self.committed.lock() { diff --git a/firewood/src/v2/api.rs b/firewood/src/v2/api.rs index 47fc01fdf..38df5dc5b 100644 --- a/firewood/src/v2/api.rs +++ b/firewood/src/v2/api.rs @@ -69,6 +69,9 @@ pub enum Error { #[error("Invalid proposal")] InvalidProposal, + + #[error("Internal error")] + InternalError(Box), } /// A range proof, consisting of a proof of the first key and the last key, @@ -139,10 +142,7 @@ pub trait DbView { async fn val(&self, key: K) -> Result>, Error>; /// Obtain a proof for a single key - async fn single_key_proof( - &self, - key: K, - ) -> Result>, Error>; + async fn single_key_proof(&self, key: K) -> Result>>, Error>; /// Obtain a range proof over a set of keys /// diff --git a/firewood/src/v2/db.rs b/firewood/src/v2/db.rs index ad5e4ac76..89853eb86 100644 --- a/firewood/src/v2/db.rs +++ b/firewood/src/v2/db.rs @@ -7,7 +7,10 @@ use tokio::sync::Mutex; use async_trait::async_trait; -use crate::v2::api::{self, Batch, KeyType, ValueType}; +use crate::{ + db::DbError, + v2::api::{self, Batch, KeyType, ValueType}, +}; use super::propose; @@ -26,6 +29,20 @@ pub struct Db { latest_cache: Mutex>>, } +impl From for api::Error { + fn from(value: DbError) -> Self { + match value { + DbError::InvalidParams => api::Error::InternalError(value.into()), + DbError::Merkle(e) => api::Error::InternalError(e.into()), + DbError::System(e) => api::Error::IO(e.into()), + DbError::KeyNotFound | DbError::CreateError => api::Error::InternalError(value.into()), + DbError::Shale(e) => api::Error::InternalError(e.into()), + DbError::IO(e) => api::Error::IO(e), + DbError::InvalidProposal => api::Error::InvalidProposal, + } + } +} + #[async_trait] impl api::Db for Db where @@ -79,10 +96,10 @@ impl api::DbView for DbView { todo!() } - async fn single_key_proof( + async fn single_key_proof( &self, _key: K, - ) -> Result>, api::Error> { + ) -> Result>>, api::Error> { todo!() } diff --git a/firewood/src/v2/emptydb.rs b/firewood/src/v2/emptydb.rs index 677f6de85..762504aad 100644 --- a/firewood/src/v2/emptydb.rs +++ b/firewood/src/v2/emptydb.rs @@ -62,10 +62,7 @@ impl DbView for HistoricalImpl { Ok(None) } - async fn single_key_proof( - &self, - _key: K, - ) -> Result>, Error> { + async fn single_key_proof(&self, _key: K) -> Result>>, Error> { Ok(None) } diff --git a/firewood/src/v2/propose.rs b/firewood/src/v2/propose.rs index 6467a6920..a6e53d0cb 100644 --- a/firewood/src/v2/propose.rs +++ b/firewood/src/v2/propose.rs @@ -121,10 +121,10 @@ impl api::DbView for Proposal { } } - async fn single_key_proof( + async fn single_key_proof( &self, _key: K, - ) -> Result>, api::Error> { + ) -> Result>>, api::Error> { todo!() } diff --git a/firewood/tests/db.rs b/firewood/tests/db.rs index d69f073e6..e522fcac2 100644 --- a/firewood/tests/db.rs +++ b/firewood/tests/db.rs @@ -118,7 +118,7 @@ fn test_revisions() { batch.push(write); } let proposal = db.new_proposal(batch).unwrap(); - proposal.commit().unwrap(); + proposal.commit_sync().unwrap(); } while dumped.len() > 10 { dumped.pop_back(); @@ -192,7 +192,7 @@ fn create_db_issue_proof() { batch.push(write); } let proposal = db.new_proposal(batch).unwrap(); - proposal.commit().unwrap(); + proposal.commit_sync().unwrap(); let root_hash = db.kv_root_hash().unwrap(); @@ -206,7 +206,7 @@ fn create_db_issue_proof() { batch.push(write); } let proposal = db.new_proposal(batch).unwrap(); - proposal.commit().unwrap(); + proposal.commit_sync().unwrap(); let rev = db.get_revision(&root_hash).unwrap(); let key = "doe".as_bytes(); @@ -274,13 +274,13 @@ fn db_proposal() -> Result<(), DbError> { key: b"k2", value: "v2".as_bytes().to_vec(), }]; - let proposal_2 = proposal.clone().propose(batch_2)?; + let proposal_2 = proposal.clone().propose_sync(batch_2)?; let rev = proposal_2.get_revision(); assert_val!(rev, "k", "v"); assert_val!(rev, "k2", "v2"); - proposal.commit()?; - proposal_2.commit()?; + proposal.commit_sync()?; + proposal_2.commit_sync()?; std::thread::scope(|scope| { scope.spawn(|| -> Result<(), DbError> { @@ -288,12 +288,12 @@ fn db_proposal() -> Result<(), DbError> { key: b"another_k", value: "another_v".as_bytes().to_vec(), }]; - let another_proposal = proposal.clone().propose(another_batch)?; + let another_proposal = proposal.clone().propose_sync(another_batch)?; let rev = another_proposal.get_revision(); assert_val!(rev, "k", "v"); assert_val!(rev, "another_k", "another_v"); // The proposal is invalid and cannot be committed - assert!(another_proposal.commit().is_err()); + assert!(another_proposal.commit_sync().is_err()); Ok(()) }); @@ -302,7 +302,7 @@ fn db_proposal() -> Result<(), DbError> { key: b"another_k_1", value: "another_v_1".as_bytes().to_vec(), }]; - let another_proposal = proposal.clone().propose(another_batch)?; + let another_proposal = proposal.clone().propose_sync(another_batch)?; let rev = another_proposal.get_revision(); assert_val!(rev, "k", "v"); assert_val!(rev, "another_k_1", "another_v_1"); @@ -326,13 +326,13 @@ fn db_proposal() -> Result<(), DbError> { key: b"k4", value: "v4".as_bytes().to_vec(), }]; - let proposal_2 = proposal.clone().propose(batch_2)?; + let proposal_2 = proposal.clone().propose_sync(batch_2)?; let rev = proposal_2.get_revision(); assert_val!(rev, "k", "v"); assert_val!(rev, "k2", "v2"); assert_val!(rev, "k3", "v3"); assert_val!(rev, "k4", "v4"); - proposal_2.commit()?; + proposal_2.commit_sync()?; Ok(()) } diff --git a/fwdctl/src/delete.rs b/fwdctl/src/delete.rs index fa8f16d03..0c2520cfa 100644 --- a/fwdctl/src/delete.rs +++ b/fwdctl/src/delete.rs @@ -31,9 +31,11 @@ pub fn run(opts: &Options) -> Result<()> { let db = Db::new(opts.db.as_str(), &cfg.build()).map_err(Error::msg)?; - let batch = vec![BatchOp::Delete { key: &opts.key }]; + let batch: Vec> = vec![BatchOp::Delete { + key: opts.key.clone(), + }]; let proposal = db.new_proposal(batch).map_err(Error::msg)?; - proposal.commit().map_err(Error::msg)?; + proposal.commit_sync().map_err(Error::msg)?; println!("key {} deleted successfully", opts.key); Ok(()) diff --git a/fwdctl/src/insert.rs b/fwdctl/src/insert.rs index 815685e94..804557de1 100644 --- a/fwdctl/src/insert.rs +++ b/fwdctl/src/insert.rs @@ -38,12 +38,12 @@ pub fn run(opts: &Options) -> Result<()> { Err(_) => return Err(anyhow!("error opening database")), }; - let batch = vec![BatchOp::Put { - key: &opts.key, + let batch: Vec, Vec>> = vec![BatchOp::Put { + key: opts.key.clone().into(), value: opts.value.bytes().collect(), }]; let proposal = db.new_proposal(batch).map_err(Error::msg)?; - proposal.commit().map_err(Error::msg)?; + proposal.commit_sync().map_err(Error::msg)?; println!("{}", opts.key); Ok(())