Skip to content

Commit

Permalink
feat: supports chains of StoreRevMut (#175)
Browse files Browse the repository at this point in the history
Co-authored-by: Ron Kuris <[email protected]>
  • Loading branch information
xinifinity and rkuris authored Aug 2, 2023
1 parent 912f27d commit 45cd6e6
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 33 deletions.
99 changes: 99 additions & 0 deletions firewood/src/storage/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,105 @@ mod tests {
assert_eq!(view.as_deref(), hash);
}

#[test]
fn test_multi_stores() {
let buf_cfg = DiskBufferConfig::builder().max_buffered(1).build();
let wal_cfg = WalConfig::builder().build();
let disk_requester = init_buffer(buf_cfg, wal_cfg);

// TODO: Run the test in a separate standalone directory for concurrency reasons
let tmp_dir = TempDir::new("firewood").unwrap();
let path = get_file_path(tmp_dir.path(), file!(), line!());
let (root_db_path, reset) = crate::file::open_dir(path, true).unwrap();

// file descriptor of the state directory
let state_path = file::touch_dir("state", &root_db_path).unwrap();
assert!(reset);
// create a new wal directory on top of root_db_fd
disk_requester.init_wal("wal", root_db_path);

// create a new state cache which tracks on disk state.
let state_cache = Arc::new(
CachedSpace::new(
&StoreConfig::builder()
.ncached_pages(1)
.ncached_files(1)
.space_id(STATE_SPACE)
.file_nbit(1)
.rootdir(state_path)
.build(),
disk_requester.clone(),
)
.unwrap(),
);

// add an in memory cached space. this will allow us to write to the
// disk buffer then later persist the change to disk.
disk_requester.reg_cached_space(state_cache.id(), state_cache.clone_files());

// memory mapped store
let mut store = StoreRevMut::new(state_cache.clone());

// mutate the in memory buffer.
let data = b"this is a test";
let hash: [u8; HASH_SIZE] = sha3::Keccak256::digest(data).into();
store.write(0, &hash);
assert_eq!(store.id(), STATE_SPACE);

let another_data = b"this is another test";
let another_hash: [u8; HASH_SIZE] = sha3::Keccak256::digest(another_data).into();

// mutate the in memory buffer in another StoreRev new from the above.
let mut another_store = StoreRevMut::new_from_other(&store);
another_store.write(32, &another_hash);
assert_eq!(another_store.id(), STATE_SPACE);

// wal should have no records.
assert!(disk_requester.collect_ash(1).unwrap().is_empty());

// get RO view of the buffer from the beginning. Both stores should have the same view.
let view = store.get_view(0, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), hash);

let view = another_store.get_view(0, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), hash);

// get RO view of the buffer from the second hash. Only the new store shoulde see the value.
let view = another_store.get_view(32, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), another_hash);
let empty: [u8; HASH_SIZE] = [0; HASH_SIZE];
let view = store.get_view(32, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), empty);

// Overwrite the value from the beginning in the new store. Only the new store shoulde see the change.
another_store.write(0, &another_hash);
let view = another_store.get_view(0, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), another_hash);
let view = store.get_view(0, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), hash);

// Commit the change. Take the delta from both stores.
let (redo_delta, wal) = store.take_delta();
assert_eq!(1, redo_delta.0.len());
assert_eq!(1, wal.undo.len());

let (another_redo_delta, another_wal) = another_store.take_delta();
assert_eq!(1, another_redo_delta.0.len());
assert_eq!(2, another_wal.undo.len());

// Verify after the changes been applied to underlying CachedSpace,
// the newly created stores should see the previous changes.
state_cache.update(&redo_delta).unwrap();
let store = StoreRevMut::new(state_cache.clone());
let view = store.get_view(0, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), hash);

state_cache.update(&another_redo_delta).unwrap();
let another_store = StoreRevMut::new(state_cache);
let view = another_store.get_view(0, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), another_hash);
}

fn get_file_path(path: &Path, file: &str, line: u32) -> PathBuf {
path.join(format!("{}_{}", file.replace('/', "-"), line))
}
Expand Down
101 changes: 72 additions & 29 deletions firewood/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ pub trait MemStoreR: Debug + Send + Sync {
// Page should be boxed as to not take up so much stack-space
type Page = Box<[u8; PAGE_SIZE as usize]>;

#[derive(Debug)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct SpaceWrite {
offset: u64,
data: Box<[u8]>,
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
/// In memory representation of Write-ahead log with `undo` and `redo`.
pub struct Ash {
/// Deltas to undo the changes.
Expand Down Expand Up @@ -432,51 +432,90 @@ struct StoreRevMutDelta {
plain: Ash,
}

impl Clone for StoreRevMutDelta {
fn clone(&self) -> Self {
let mut pages = HashMap::new();
for (pid, page) in self.pages.iter() {
let mut data = Box::new([0u8; PAGE_SIZE as usize]);
data.copy_from_slice(page.as_ref());
pages.insert(*pid, data);
}

Self {
pages,
plain: self.plain.clone(),
}
}
}

#[derive(Clone, Debug)]
/// A mutable revision of the store. The view is constucted by applying the `deltas` to the
/// `base space`. The `deltas` tracks both `undo` and `redo` to be able to rewind or reapply
/// the changes.
/// the changes. `StoreRevMut` supports basing on top of another `StoreRevMut`, by chaining
/// `prev_deltas` (from based `StoreRevMut`) with current `deltas` from itself . In this way,
/// callers can create a new `StoreRevMut` from an existing one without actually committing
/// the mutations to the base space.
pub struct StoreRevMut {
base_space: Arc<dyn MemStoreR>,
deltas: Arc<RwLock<StoreRevMutDelta>>,
prev_deltas: Arc<RwLock<StoreRevMutDelta>>,
}

impl StoreRevMut {
pub fn new(base_space: Arc<dyn MemStoreR>) -> Self {
Self {
base_space,
deltas: Default::default(),
prev_deltas: Default::default(),
}
}

fn get_page_mut<'a>(&self, deltas: &'a mut StoreRevMutDelta, pid: u64) -> &'a mut [u8] {
let page = deltas.pages.entry(pid).or_insert_with(|| {
Box::new(
self.base_space
.get_slice(pid << PAGE_SIZE_NBIT, PAGE_SIZE)
.unwrap()
.try_into()
.unwrap(),
)
});
pub fn new_from_other(other: &StoreRevMut) -> Self {
Self {
base_space: other.base_space.clone(),
deltas: Default::default(),
prev_deltas: other.deltas.clone(),
}
}

fn get_page_mut<'a>(
&self,
deltas: &'a mut StoreRevMutDelta,
prev_deltas: &StoreRevMutDelta,
pid: u64,
) -> &'a mut [u8] {
let page = deltas
.pages
.entry(pid)
.or_insert_with(|| match prev_deltas.pages.get(&pid) {
Some(p) => Box::new(*p.as_ref()),
None => Box::new(
self.base_space
.get_slice(pid << PAGE_SIZE_NBIT, PAGE_SIZE)
.unwrap()
.try_into()
.unwrap(),
),
});

page.as_mut()
}

pub fn take_delta(&self) -> (StoreDelta, Ash) {
let mut pages = Vec::new();
let deltas = std::mem::replace(
&mut *self.deltas.write(),
StoreRevMutDelta {
pages: HashMap::new(),
plain: Ash::new(),
},
);
for (pid, page) in deltas.pages.into_iter() {
pages.push(DeltaPage(pid, page));
}
let mut guard = self.deltas.write();
let mut pages: Vec<DeltaPage> = guard
.pages
.iter()
.map(|page| DeltaPage(*page.0, page.1.clone()))
.collect();
pages.sort_by_key(|p| p.0);
(StoreDelta(pages), deltas.plain)
let cloned_plain = guard.plain.clone();
// TODO: remove this line, since we don't know why this works
*guard = StoreRevMutDelta {
pages: HashMap::new(),
plain: Ash::new(),
};
(StoreDelta(pages), cloned_plain)
}
}

Expand Down Expand Up @@ -544,15 +583,18 @@ impl CachedStore for StoreRevMut {

if s_pid == e_pid {
let mut deltas = self.deltas.write();
let slice = &mut self.get_page_mut(deltas.deref_mut(), s_pid)[s_off..e_off + 1];
let slice = &mut self.get_page_mut(deltas.deref_mut(), &self.prev_deltas.read(), s_pid)
[s_off..e_off + 1];
undo.extend(&*slice);
slice.copy_from_slice(change)
} else {
let len = PAGE_SIZE as usize - s_off;

{
let mut deltas = self.deltas.write();
let slice = &mut self.get_page_mut(deltas.deref_mut(), s_pid)[s_off..];
let slice =
&mut self.get_page_mut(deltas.deref_mut(), &self.prev_deltas.read(), s_pid)
[s_off..];
undo.extend(&*slice);
slice.copy_from_slice(&change[..len]);
}
Expand All @@ -561,13 +603,14 @@ impl CachedStore for StoreRevMut {

let mut deltas = self.deltas.write();
for p in s_pid + 1..e_pid {
let slice = self.get_page_mut(deltas.deref_mut(), p);
let slice = self.get_page_mut(deltas.deref_mut(), &self.prev_deltas.read(), p);
undo.extend(&*slice);
slice.copy_from_slice(&change[..PAGE_SIZE as usize]);
change = &change[PAGE_SIZE as usize..];
}

let slice = &mut self.get_page_mut(deltas.deref_mut(), e_pid)[..e_off + 1];
let slice = &mut self.get_page_mut(deltas.deref_mut(), &self.prev_deltas.read(), e_pid)
[..e_off + 1];
undo.extend(&*slice);
slice.copy_from_slice(change);
}
Expand Down
9 changes: 5 additions & 4 deletions firewood/tests/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,11 @@ fn test_revisions() {
.zip(hashes.iter().cloned())
.map(|(data, hash)| (data, db.get_revision(&hash, None).unwrap()))
.map(|(data, rev)| (data, kv_dump!(rev)))
.for_each(|(b, a)| {
if &a != b {
print!("{a}\n{b}");
panic!("not the same");
.for_each(|(previous_dump, after_reopen_dump)| {
if &after_reopen_dump != previous_dump {
panic!(
"not the same: pass {i}:\n{after_reopen_dump}\n--------\n{previous_dump}"
);
}
});
println!("i = {i}");
Expand Down

0 comments on commit 45cd6e6

Please sign in to comment.