Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: supports chains of StoreRevMut #175

Merged
merged 4 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions firewood/src/storage/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,105 @@ mod tests {
assert_eq!(view.as_deref(), hash);
}

#[test]
fn test_multi_stores() {
let buf_cfg = DiskBufferConfig::builder().max_buffered(1).build();
let wal_cfg = WalConfig::builder().build();
let disk_requester = init_buffer(buf_cfg, wal_cfg);

// TODO: Run the test in a separate standalone directory for concurrency reasons
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's doing this already?
nit: I think we have a method somewhere that creates a directory under target, which means cargo clean will remove it automatically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right, I will remove the comment in a follow up PR.

let tmp_dir = TempDir::new("firewood").unwrap();
let path = get_file_path(tmp_dir.path(), file!(), line!());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

let (root_db_path, reset) = crate::file::open_dir(path, true).unwrap();

// file descriptor of the state directory
let state_path = file::touch_dir("state", &root_db_path).unwrap();
assert!(reset);
// create a new wal directory on top of root_db_fd
disk_requester.init_wal("wal", root_db_path);

// create a new state cache which tracks on disk state.
let state_cache = Arc::new(
CachedSpace::new(
&StoreConfig::builder()
.ncached_pages(1)
.ncached_files(1)
.space_id(STATE_SPACE)
.file_nbit(1)
.rootdir(state_path)
.build(),
disk_requester.clone(),
)
.unwrap(),
);

// add an in memory cached space. this will allow us to write to the
// disk buffer then later persist the change to disk.
disk_requester.reg_cached_space(state_cache.id(), state_cache.clone_files());

// memory mapped store
let mut store = StoreRevMut::new(state_cache.clone());

// mutate the in memory buffer.
let data = b"this is a test";
let hash: [u8; HASH_SIZE] = sha3::Keccak256::digest(data).into();
store.write(0, &hash);
assert_eq!(store.id(), STATE_SPACE);

let another_data = b"this is another test";
let another_hash: [u8; HASH_SIZE] = sha3::Keccak256::digest(another_data).into();

// mutate the in memory buffer in another StoreRev new from the above.
let mut another_store = StoreRevMut::new_from_other(&store);
another_store.write(32, &another_hash);
assert_eq!(another_store.id(), STATE_SPACE);

// wal should have no records.
assert!(disk_requester.collect_ash(1).unwrap().is_empty());

// get RO view of the buffer from the beginning. Both stores should have the same view.
let view = store.get_view(0, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), hash);

let view = another_store.get_view(0, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), hash);

// get RO view of the buffer from the second hash. Only the new store shoulde see the value.
let view = another_store.get_view(32, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), another_hash);
let empty: [u8; HASH_SIZE] = [0; HASH_SIZE];
let view = store.get_view(32, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), empty);

// Overwrite the value from the beginning in the new store. Only the new store shoulde see the change.
another_store.write(0, &another_hash);
let view = another_store.get_view(0, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), another_hash);
let view = store.get_view(0, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), hash);

// Commit the change. Take the delta from both stores.
let (redo_delta, wal) = store.take_delta();
assert_eq!(1, redo_delta.0.len());
assert_eq!(1, wal.undo.len());

let (another_redo_delta, another_wal) = another_store.take_delta();
assert_eq!(1, another_redo_delta.0.len());
assert_eq!(2, another_wal.undo.len());

// Verify after the changes been applied to underlying CachedSpace,
// the newly created stores should see the previous changes.
state_cache.update(&redo_delta).unwrap();
let store = StoreRevMut::new(state_cache.clone());
let view = store.get_view(0, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), hash);

state_cache.update(&another_redo_delta).unwrap();
let another_store = StoreRevMut::new(state_cache.clone());
let view = another_store.get_view(0, HASH_SIZE as u64).unwrap();
assert_eq!(view.as_deref(), another_hash);
}

fn get_file_path(path: &Path, file: &str, line: u32) -> PathBuf {
path.join(format!("{}_{}", file.replace('/', "-"), line))
}
Expand Down
75 changes: 58 additions & 17 deletions firewood/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ pub trait MemStoreR: Debug + Send + Sync {
// Page should be boxed as to not take up so much stack-space
type Page = Box<[u8; PAGE_SIZE as usize]>;

#[derive(Debug)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct SpaceWrite {
offset: u64,
data: Box<[u8]>,
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
/// In memory representation of Write-ahead log with `undo` and `redo`.
pub struct Ash {
/// Deltas to undo the changes.
Expand Down Expand Up @@ -432,33 +432,71 @@ struct StoreRevMutDelta {
plain: Ash,
}

impl Clone for StoreRevMutDelta {
fn clone(&self) -> Self {
let mut pages = HashMap::new();
for (pid, page) in self.pages.iter() {
let mut data = Box::new([0u8; PAGE_SIZE as usize]);
data.copy_from_slice(page.as_ref());
pages.insert(*pid, data);
}

Self {
pages,
plain: self.plain.clone(),
}
}
}

#[derive(Clone, Debug)]
/// A mutable revision of the store. The view is constucted by applying the `deltas` to the
/// `base space`. The `deltas` tracks both `undo` and `redo` to be able to rewind or reapply
/// the changes.
/// the changes. `StoreRevMut` supports basing on top of another `StoreRevMut`, by chaining
/// `prev_deltas` (from based `StoreRevMut`) with current `deltas` from itself . In this way,
/// callers can create a new `StoreRevMut` from an existing one without actually committing
/// the mutations to the base space.
pub struct StoreRevMut {
base_space: Arc<dyn MemStoreR>,
deltas: Arc<RwLock<StoreRevMutDelta>>,
prev_deltas: Arc<StoreRevMutDelta>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you change this to Arc<RwLock<StoreRevMutDelta>> this means you won't have to clone anything, just increasing reference counts. You'll have to get a few more read locks, but I think that's a very small price to pay.

I think this would allow you to remove a lot of the clone implementations too.

Suggested change
prev_deltas: Arc<StoreRevMutDelta>,
prev_deltas: Arc<RwLock<StoreRevMutDelta>>,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, discussed this offline. The reason I did a clone is in take_delta StoreDelta actually takes the ownership of the pages and the child StoreRevMut will no longer have reference to it. I tried a bit to share the references to the pages and give up, but let me try again. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, after debugged together, @rkuris posted the commit to defer the clone during take_delta(). This has to be fixed as it is expensive to do clone in commit, but we are trying to push down the clone so that we can complete remove that later. A new issue will be created for follow up.

}

impl StoreRevMut {
pub fn new(base_space: Arc<dyn MemStoreR>) -> Self {
Self {
base_space,
deltas: Default::default(),
prev_deltas: Default::default(),
}
}

fn get_page_mut<'a>(&self, deltas: &'a mut StoreRevMutDelta, pid: u64) -> &'a mut [u8] {
let page = deltas.pages.entry(pid).or_insert_with(|| {
Box::new(
self.base_space
.get_slice(pid << PAGE_SIZE_NBIT, PAGE_SIZE)
.unwrap()
.try_into()
.unwrap(),
)
});
pub fn new_from_other(other: &StoreRevMut) -> Self {
Self {
base_space: other.base_space.clone(),
deltas: Default::default(),
prev_deltas: Arc::new(other.deltas.read().clone()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets a lot easier with the type change above:

Suggested change
prev_deltas: Arc::new(other.deltas.read().clone()),
prev_deltas: other.deltas.clone(),

}
}

fn get_page_mut<'a>(
&self,
deltas: &'a mut StoreRevMutDelta,
prev_deltas: &StoreRevMutDelta,
pid: u64,
) -> &'a mut [u8] {
let page = deltas
.pages
.entry(pid)
.or_insert_with(|| match prev_deltas.pages.get(&pid) {
Some(p) => Box::new(*p.as_ref()),
None => Box::new(
self.base_space
.get_slice(pid << PAGE_SIZE_NBIT, PAGE_SIZE)
.unwrap()
.try_into()
.unwrap(),
),
});

page.as_mut()
}
Expand Down Expand Up @@ -544,15 +582,17 @@ impl CachedStore for StoreRevMut {

if s_pid == e_pid {
let mut deltas = self.deltas.write();
let slice = &mut self.get_page_mut(deltas.deref_mut(), s_pid)[s_off..e_off + 1];
let slice = &mut self.get_page_mut(deltas.deref_mut(), &self.prev_deltas, s_pid)
[s_off..e_off + 1];
undo.extend(&*slice);
slice.copy_from_slice(change)
} else {
let len = PAGE_SIZE as usize - s_off;

{
let mut deltas = self.deltas.write();
let slice = &mut self.get_page_mut(deltas.deref_mut(), s_pid)[s_off..];
let slice =
&mut self.get_page_mut(deltas.deref_mut(), &self.prev_deltas, s_pid)[s_off..];
undo.extend(&*slice);
slice.copy_from_slice(&change[..len]);
}
Expand All @@ -561,13 +601,14 @@ impl CachedStore for StoreRevMut {

let mut deltas = self.deltas.write();
for p in s_pid + 1..e_pid {
let slice = self.get_page_mut(deltas.deref_mut(), p);
let slice = self.get_page_mut(deltas.deref_mut(), &self.prev_deltas, p);
undo.extend(&*slice);
slice.copy_from_slice(&change[..PAGE_SIZE as usize]);
change = &change[PAGE_SIZE as usize..];
}

let slice = &mut self.get_page_mut(deltas.deref_mut(), e_pid)[..e_off + 1];
let slice =
&mut self.get_page_mut(deltas.deref_mut(), &self.prev_deltas, e_pid)[..e_off + 1];
undo.extend(&*slice);
slice.copy_from_slice(change);
}
Expand Down
Loading