Skip to content

Commit

Permalink
chore: abstract out mutable store creation
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Hao committed Aug 2, 2023
1 parent bad3ddd commit ab66f06
Showing 1 changed file with 73 additions and 56 deletions.
129 changes: 73 additions & 56 deletions firewood/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ const SPACE_RESERVED: u64 = 0x1000;

const MAGIC_STR: &[u8; 13] = b"firewood v0.1";

type Store = (
Universe<Arc<StoreRevMut>>,
DbRev<CompactSpace<Node, StoreRevMut>>,
);

#[derive(Debug)]
#[non_exhaustive]
pub enum DbError {
Expand Down Expand Up @@ -115,7 +120,7 @@ pub struct DbRevConfig {
}

/// Database configuration.
#[derive(TypedBuilder, Debug)]
#[derive(Clone, TypedBuilder, Debug)]
pub struct DbConfig {
/// Maximum cached pages for the free list of the item stash.
#[builder(default = 16384)] // 64M total size by default
Expand Down Expand Up @@ -563,12 +568,12 @@ impl Db<CompactSpace<Node, StoreRevMut>> {
let mut header_bytes = [0; std::mem::size_of::<DbParams>()];
nix::sys::uio::pread(fd0, &mut header_bytes, 0).map_err(DbError::System)?;
drop(file0);
let mut offset = header_bytes.len() as u64;
let header: DbParams = cast_slice(&header_bytes)[0];
let offset = header_bytes.len() as u64;
let params: DbParams = cast_slice(&header_bytes)[0];

let wal = WalConfig::builder()
.file_nbit(header.wal_file_nbit)
.block_nbit(header.wal_block_nbit)
.file_nbit(params.wal_file_nbit)
.block_nbit(params.wal_block_nbit)
.max_revisions(cfg.wal.max_revisions)
.build();
let (sender, inbound) = tokio::sync::mpsc::channel(cfg.buffer.max_buffered);
Expand All @@ -585,7 +590,7 @@ impl Db<CompactSpace<Node, StoreRevMut>> {
.ncached_pages(cfg.root_hash_ncached_pages)
.ncached_files(cfg.root_hash_ncached_files)
.space_id(ROOT_HASH_SPACE)
.file_nbit(header.root_hash_file_nbit)
.file_nbit(params.root_hash_file_nbit)
.rootdir(root_hash_path)
.build(),
disk_requester.clone(),
Expand All @@ -602,7 +607,7 @@ impl Db<CompactSpace<Node, StoreRevMut>> {
.ncached_pages(cfg.meta_ncached_pages)
.ncached_files(cfg.meta_ncached_files)
.space_id(MERKLE_META_SPACE)
.file_nbit(header.meta_file_nbit)
.file_nbit(params.meta_file_nbit)
.rootdir(merkle_meta_path)
.build(),
disk_requester.clone(),
Expand All @@ -615,7 +620,7 @@ impl Db<CompactSpace<Node, StoreRevMut>> {
.ncached_pages(cfg.payload_ncached_pages)
.ncached_files(cfg.payload_ncached_files)
.space_id(MERKLE_PAYLOAD_SPACE)
.file_nbit(header.payload_file_nbit)
.file_nbit(params.payload_file_nbit)
.rootdir(merkle_payload_path)
.build(),
disk_requester.clone(),
Expand All @@ -630,7 +635,7 @@ impl Db<CompactSpace<Node, StoreRevMut>> {
.ncached_pages(cfg.meta_ncached_pages)
.ncached_files(cfg.meta_ncached_files)
.space_id(BLOB_META_SPACE)
.file_nbit(header.meta_file_nbit)
.file_nbit(params.meta_file_nbit)
.rootdir(blob_meta_path)
.build(),
disk_requester.clone(),
Expand All @@ -643,7 +648,7 @@ impl Db<CompactSpace<Node, StoreRevMut>> {
.ncached_pages(cfg.payload_ncached_pages)
.ncached_files(cfg.payload_ncached_files)
.space_id(BLOB_PAYLOAD_SPACE)
.file_nbit(header.payload_file_nbit)
.file_nbit(params.payload_file_nbit)
.rootdir(blob_payload_path)
.build(),
disk_requester.clone(),
Expand All @@ -665,32 +670,72 @@ impl Db<CompactSpace<Node, StoreRevMut>> {
disk_requester.reg_cached_space(cached_space.id(), cached_space.clone_files());
});

// set up the storage layout
// recover from Wal
disk_requester.init_wal("wal", db_path);

let root_hash_staging = StoreRevMut::new(root_hash_cache);
let (data_staging, mut latest) = Db::new_store(&data_cache, reset, offset, cfg, &params)?;
latest.flush_dirty().unwrap();

let base = Universe {
merkle: get_sub_universe_from_empty_delta(&data_cache.merkle),
blob: get_sub_universe_from_empty_delta(&data_cache.blob),
};

Ok(Self {
inner: Arc::new(RwLock::new(DbInner {
latest,
disk_thread,
disk_requester,
data_staging,
data_cache,
root_hash_staging,
})),
revisions: Arc::new(Mutex::new(DbRevInner {
inner: VecDeque::new(),
root_hashes: VecDeque::new(),
max_revisions: cfg.wal.max_revisions as usize,
base,
})),
payload_regn_nbit: params.payload_regn_nbit,
rev_cfg: cfg.rev.clone(),
metrics: Arc::new(DbMetrics::default()),
})
}

/// Create a new mutable store and an alterable revision of the DB on top.
fn new_store(
cached_space: &Universe<Arc<CachedSpace>>,
reset: bool,
offset: u64,
cfg: &DbConfig,
params: &DbParams,
) -> Result<Store, DbError> {
let mut offset = offset;
let db_header: ObjPtr<DbHeader> = ObjPtr::new_from_addr(offset);
offset += DbHeader::MSIZE;
let merkle_payload_header: ObjPtr<CompactSpaceHeader> = ObjPtr::new_from_addr(offset);
offset += CompactSpaceHeader::MSIZE;
assert!(offset <= SPACE_RESERVED);
let blob_payload_header: ObjPtr<CompactSpaceHeader> = ObjPtr::new_from_addr(0);

let mut data_staging_merkle_meta = StoreRevMut::new(data_cache.merkle.meta.clone());
let mut data_staging_blob_meta = StoreRevMut::new(data_cache.blob.meta.clone());
let mut merkle_meta_store = StoreRevMut::new(cached_space.merkle.meta.clone());
let mut blob_meta_store = StoreRevMut::new(cached_space.blob.meta.clone());

if reset {
// initialize space headers
data_staging_merkle_meta.write(
merkle_meta_store.write(
merkle_payload_header.addr(),
&shale::to_dehydrated(&shale::compact::CompactSpaceHeader::new(
SPACE_RESERVED,
SPACE_RESERVED,
))?,
);
data_staging_merkle_meta.write(
merkle_meta_store.write(
db_header.addr(),
&shale::to_dehydrated(&DbHeader::new_empty())?,
);
data_staging_blob_meta.write(
blob_meta_store.write(
blob_payload_header.addr(),
&shale::to_dehydrated(&shale::compact::CompactSpaceHeader::new(
SPACE_RESERVED,
Expand All @@ -699,24 +744,20 @@ impl Db<CompactSpace<Node, StoreRevMut>> {
);
}

let data_staging = Universe {
let store = Universe {
merkle: SubUniverse::new(
Arc::new(data_staging_merkle_meta),
Arc::new(StoreRevMut::new(data_cache.merkle.payload.clone())),
Arc::new(merkle_meta_store),
Arc::new(StoreRevMut::new(cached_space.merkle.payload.clone())),
),
blob: SubUniverse::new(
Arc::new(data_staging_blob_meta),
Arc::new(StoreRevMut::new(data_cache.blob.payload.clone())),
Arc::new(blob_meta_store),
Arc::new(StoreRevMut::new(cached_space.blob.payload.clone())),
),
};
let root_hash_staging = StoreRevMut::new(root_hash_cache);

// recover from Wal
disk_requester.init_wal("wal", db_path);

let (mut db_header_ref, merkle_payload_header_ref, _blob_payload_header_ref) = {
let merkle_meta_ref = data_staging.merkle.meta.as_ref();
let blob_meta_ref = data_staging.blob.meta.as_ref();
let merkle_meta_ref = store.merkle.meta.as_ref();
let blob_meta_ref = store.blob.meta.as_ref();

(
StoredView::ptr_to_obj(merkle_meta_ref, db_header, DbHeader::MSIZE).unwrap(),
Expand All @@ -736,12 +777,12 @@ impl Db<CompactSpace<Node, StoreRevMut>> {
};

let merkle_space = shale::compact::CompactSpace::new(
data_staging.merkle.meta.clone(),
data_staging.merkle.payload.clone(),
store.merkle.meta.clone(),
store.merkle.payload.clone(),
merkle_payload_header_ref,
shale::ObjCache::new(cfg.rev.merkle_ncached_objs),
cfg.payload_max_walk,
header.payload_regn_nbit,
params.payload_regn_nbit,
)
.unwrap();

Expand Down Expand Up @@ -776,38 +817,14 @@ impl Db<CompactSpace<Node, StoreRevMut>> {
err.map_err(DbError::Merkle)?
}

let mut latest = DbRev {
let rev = DbRev {
header: db_header_ref,
merkle: Merkle::new(Box::new(merkle_space)),
#[cfg(feature = "eth")]
blob: BlobStash::new(Box::new(blob_space)),
};
latest.flush_dirty().unwrap();

let base = Universe {
merkle: get_sub_universe_from_empty_delta(&data_cache.merkle),
blob: get_sub_universe_from_empty_delta(&data_cache.blob),
};

Ok(Self {
inner: Arc::new(RwLock::new(DbInner {
latest,
disk_thread,
disk_requester,
data_staging,
data_cache,
root_hash_staging,
})),
revisions: Arc::new(Mutex::new(DbRevInner {
inner: VecDeque::new(),
root_hashes: VecDeque::new(),
max_revisions: cfg.wal.max_revisions as usize,
base,
})),
payload_regn_nbit: header.payload_regn_nbit,
rev_cfg: cfg.rev.clone(),
metrics: Arc::new(DbMetrics::default()),
})
Ok((store, rev))
}

/// Get a handle that grants the access to any committed state of the entire DB,
Expand Down

0 comments on commit ab66f06

Please sign in to comment.