Skip to content

Commit

Permalink
Break out proposal module from db (#244)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardpringle authored Sep 1, 2023
1 parent 688b283 commit ca87a2b
Show file tree
Hide file tree
Showing 2 changed files with 328 additions and 313 deletions.
323 changes: 10 additions & 313 deletions firewood/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::{
merkle::{Merkle, MerkleError, Node, TrieHash, TRIE_HASH_LEN},
proof::ProofError,
storage::{
buffer::{BufferWrite, DiskBuffer, DiskBufferRequester},
AshRecord, CachedSpace, MemStoreR, SpaceWrite, StoreConfig, StoreDelta, StoreRevMut,
StoreRevShared, ZeroStore, PAGE_SIZE_NBIT,
buffer::{DiskBuffer, DiskBufferRequester},
CachedSpace, MemStoreR, SpaceWrite, StoreConfig, StoreDelta, StoreRevMut, StoreRevShared,
ZeroStore, PAGE_SIZE_NBIT,
},
v2::api::Proof,
};
Expand All @@ -37,6 +37,12 @@ use std::{
thread::JoinHandle,
};

mod proposal;

pub use proposal::{Batch, BatchOp, Proposal};

use self::proposal::ProposalBase;

const MERKLE_META_SPACE: SpaceId = 0x0;
const MERKLE_PAYLOAD_SPACE: SpaceId = 0x1;
const BLOB_META_SPACE: SpaceId = 0x2;
Expand Down Expand Up @@ -378,7 +384,7 @@ pub struct Db {
cfg: DbConfig,
}

// #[metered(registry = DbMetrics, visibility = pub)]
#[metered(registry = DbMetrics, visibility = pub)]
impl Db {
const PARAM_SIZE: u64 = size_of::<DbParams>() as u64;

Expand Down Expand Up @@ -906,10 +912,7 @@ impl Db {
}
.into()
}
}

#[metered(registry = DbMetrics, visibility = pub)]
impl Db {
/// Dump the Trie of the latest generic key-value storage.
pub fn kv_dump(&self, w: &mut dyn Write) -> Result<(), DbError> {
self.revisions.lock().base_revision.kv_dump(w)
Expand Down Expand Up @@ -945,309 +948,3 @@ impl<S> std::ops::Deref for Revision<S> {
&self.rev
}
}

/// A key/value pair operation. Only put (upsert) and delete are
/// supported
#[derive(Debug)]
pub enum BatchOp<K> {
Put { key: K, value: Vec<u8> },
Delete { key: K },
}

/// A list of operations to consist of a batch that
/// can be proposed
pub type Batch<K> = Vec<BatchOp<K>>;

/// An atomic batch of changes proposed against the latest committed revision,
/// or any existing [Proposal]. Multiple proposals can be created against the
/// latest committed revision at the same time. [Proposal] is immutable meaning
/// the internal batch cannot be altered after creation. Committing a proposal
/// invalidates all other proposals that are not children of the committed one.
pub struct Proposal {
// State of the Db
m: Arc<RwLock<DbInner>>,
r: Arc<Mutex<DbRevInner<SharedStore>>>,
cfg: DbConfig,

// State of the proposal
rev: DbRev<Store>,
store: Universe<Arc<StoreRevMut>>,
committed: Arc<Mutex<bool>>,

parent: ProposalBase,
}

pub enum ProposalBase {
Proposal(Arc<Proposal>),
View(Arc<DbRev<SharedStore>>),
}

impl Proposal {
// Propose a new proposal from this proposal. The new proposal will be
// the child of it.
pub fn propose<K: AsRef<[u8]>>(self: Arc<Self>, data: Batch<K>) -> Result<Proposal, DbError> {
let store = self.store.new_from_other();

let m = Arc::clone(&self.m);
let r = Arc::clone(&self.r);
let cfg = self.cfg.clone();

let db_header_ref = Db::get_db_header_ref(store.merkle.meta.as_ref())?;

let merkle_payload_header_ref = Db::get_payload_header_ref(
store.merkle.meta.as_ref(),
Db::PARAM_SIZE + DbHeader::MSIZE,
)?;

let blob_payload_header_ref = Db::get_payload_header_ref(store.blob.meta.as_ref(), 0)?;

let header_refs = (
db_header_ref,
merkle_payload_header_ref,
blob_payload_header_ref,
);

let mut rev = Db::new_revision(
header_refs,
(store.merkle.meta.clone(), store.merkle.payload.clone()),
(store.blob.meta.clone(), store.blob.payload.clone()),
cfg.payload_regn_nbit,
cfg.payload_max_walk,
&cfg.rev,
)?;
data.into_iter().try_for_each(|op| -> Result<(), DbError> {
match op {
BatchOp::Put { key, value } => {
let (header, merkle) = rev.borrow_split();
merkle
.insert(key, value, header.kv_root)
.map_err(DbError::Merkle)?;
Ok(())
}
BatchOp::Delete { key } => {
let (header, merkle) = rev.borrow_split();
merkle
.remove(key, header.kv_root)
.map_err(DbError::Merkle)?;
Ok(())
}
}
})?;
rev.flush_dirty().unwrap();

let parent = ProposalBase::Proposal(self);

Ok(Proposal {
m,
r,
cfg,
rev,
store,
committed: Arc::new(Mutex::new(false)),
parent,
})
}

/// Persist all changes to the DB. The atomicity of the [Proposal] guarantees all changes are
/// either retained on disk or lost together during a crash.
pub fn commit(&self) -> Result<(), DbError> {
let mut committed = self.committed.lock();
if *committed {
return Ok(());
}

if let ProposalBase::Proposal(p) = &self.parent {
p.commit()?;
};

// Check for if it can be committed
let mut revisions = self.r.lock();
let committed_root_hash = revisions.base_revision.kv_root_hash().ok();
let committed_root_hash =
committed_root_hash.expect("committed_root_hash should not be none");
match &self.parent {
ProposalBase::Proposal(p) => {
let parent_root_hash = p.rev.kv_root_hash().ok();
let parent_root_hash =
parent_root_hash.expect("parent_root_hash should not be none");
if parent_root_hash != committed_root_hash {
return Err(DbError::InvalidProposal);
}
}
ProposalBase::View(p) => {
let parent_root_hash = p.kv_root_hash().ok();
let parent_root_hash =
parent_root_hash.expect("parent_root_hash should not be none");
if parent_root_hash != committed_root_hash {
return Err(DbError::InvalidProposal);
}
}
};

let kv_root_hash = self.rev.kv_root_hash().ok();
let kv_root_hash = kv_root_hash.expect("kv_root_hash should not be none");

// clear the staging layer and apply changes to the CachedSpace
let (merkle_payload_redo, merkle_payload_wal) = self.store.merkle.payload.delta();
let (merkle_meta_redo, merkle_meta_wal) = self.store.merkle.meta.delta();
let (blob_payload_redo, blob_payload_wal) = self.store.blob.payload.delta();
let (blob_meta_redo, blob_meta_wal) = self.store.blob.meta.delta();

let mut rev_inner = self.m.write();
let merkle_meta_undo = rev_inner
.cached_space
.merkle
.meta
.update(&merkle_meta_redo)
.unwrap();
let merkle_payload_undo = rev_inner
.cached_space
.merkle
.payload
.update(&merkle_payload_redo)
.unwrap();
let blob_meta_undo = rev_inner
.cached_space
.blob
.meta
.update(&blob_meta_redo)
.unwrap();
let blob_payload_undo = rev_inner
.cached_space
.blob
.payload
.update(&blob_payload_redo)
.unwrap();

// update the rolling window of past revisions
let latest_past = Universe {
merkle: get_sub_universe_from_deltas(
&rev_inner.cached_space.merkle,
merkle_meta_undo,
merkle_payload_undo,
),
blob: get_sub_universe_from_deltas(
&rev_inner.cached_space.blob,
blob_meta_undo,
blob_payload_undo,
),
};

let max_revisions = revisions.max_revisions;
if let Some(rev) = revisions.inner.front_mut() {
rev.merkle
.meta
.set_base_space(latest_past.merkle.meta.inner().clone());
rev.merkle
.payload
.set_base_space(latest_past.merkle.payload.inner().clone());
rev.blob
.meta
.set_base_space(latest_past.blob.meta.inner().clone());
rev.blob
.payload
.set_base_space(latest_past.blob.payload.inner().clone());
}
revisions.inner.push_front(latest_past);
while revisions.inner.len() > max_revisions {
revisions.inner.pop_back();
}

let base = Universe {
merkle: get_sub_universe_from_empty_delta(&rev_inner.cached_space.merkle),
blob: get_sub_universe_from_empty_delta(&rev_inner.cached_space.blob),
};

let db_header_ref = Db::get_db_header_ref(&base.merkle.meta)?;

let merkle_payload_header_ref =
Db::get_payload_header_ref(&base.merkle.meta, Db::PARAM_SIZE + DbHeader::MSIZE)?;

let blob_payload_header_ref = Db::get_payload_header_ref(&base.blob.meta, 0)?;

let header_refs = (
db_header_ref,
merkle_payload_header_ref,
blob_payload_header_ref,
);

let base_revision = Db::new_revision(
header_refs,
(base.merkle.meta.clone(), base.merkle.payload.clone()),
(base.blob.meta.clone(), base.blob.payload.clone()),
0,
self.cfg.payload_max_walk,
&self.cfg.rev,
)?;
revisions.base = base;
revisions.base_revision = Arc::new(base_revision);

// update the rolling window of root hashes
revisions.root_hashes.push_front(kv_root_hash.clone());
if revisions.root_hashes.len() > max_revisions {
revisions
.root_hashes
.resize(max_revisions, TrieHash([0; TRIE_HASH_LEN]));
}

rev_inner.root_hash_staging.write(0, &kv_root_hash.0);
let (root_hash_redo, root_hash_wal) = rev_inner.root_hash_staging.delta();

// schedule writes to the disk
rev_inner.disk_requester.write(
vec![
BufferWrite {
space_id: self.store.merkle.payload.id(),
delta: merkle_payload_redo,
},
BufferWrite {
space_id: self.store.merkle.meta.id(),
delta: merkle_meta_redo,
},
BufferWrite {
space_id: self.store.blob.payload.id(),
delta: blob_payload_redo,
},
BufferWrite {
space_id: self.store.blob.meta.id(),
delta: blob_meta_redo,
},
BufferWrite {
space_id: rev_inner.root_hash_staging.id(),
delta: root_hash_redo,
},
],
AshRecord(
[
(MERKLE_META_SPACE, merkle_meta_wal),
(MERKLE_PAYLOAD_SPACE, merkle_payload_wal),
(BLOB_META_SPACE, blob_meta_wal),
(BLOB_PAYLOAD_SPACE, blob_payload_wal),
(ROOT_HASH_SPACE, root_hash_wal),
]
.into(),
),
);
*committed = true;
Ok(())
}
}

impl Proposal {
pub fn get_revision(&self) -> &DbRev<Store> {
&self.rev
}
}

impl Drop for Proposal {
fn drop(&mut self) {
if !*self.committed.lock() {
// drop the staging changes
self.store.merkle.payload.reset_deltas();
self.store.merkle.meta.reset_deltas();
self.store.blob.payload.reset_deltas();
self.store.blob.meta.reset_deltas();
self.m.read().root_hash_staging.reset_deltas();
}
}
}
Loading

0 comments on commit ca87a2b

Please sign in to comment.