-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: supports chains of StoreRevMut
#175
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -803,6 +803,105 @@ mod tests { | |
assert_eq!(view.as_deref(), hash); | ||
} | ||
|
||
#[test] | ||
fn test_multi_stores() { | ||
let buf_cfg = DiskBufferConfig::builder().max_buffered(1).build(); | ||
let wal_cfg = WalConfig::builder().build(); | ||
let disk_requester = init_buffer(buf_cfg, wal_cfg); | ||
|
||
// TODO: Run the test in a separate standalone directory for concurrency reasons | ||
let tmp_dir = TempDir::new("firewood").unwrap(); | ||
let path = get_file_path(tmp_dir.path(), file!(), line!()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ❤️ |
||
let (root_db_path, reset) = crate::file::open_dir(path, true).unwrap(); | ||
|
||
// file descriptor of the state directory | ||
let state_path = file::touch_dir("state", &root_db_path).unwrap(); | ||
assert!(reset); | ||
// create a new wal directory on top of root_db_fd | ||
disk_requester.init_wal("wal", root_db_path); | ||
|
||
// create a new state cache which tracks on disk state. | ||
let state_cache = Arc::new( | ||
CachedSpace::new( | ||
&StoreConfig::builder() | ||
.ncached_pages(1) | ||
.ncached_files(1) | ||
.space_id(STATE_SPACE) | ||
.file_nbit(1) | ||
.rootdir(state_path) | ||
.build(), | ||
disk_requester.clone(), | ||
) | ||
.unwrap(), | ||
); | ||
|
||
// add an in memory cached space. this will allow us to write to the | ||
// disk buffer then later persist the change to disk. | ||
disk_requester.reg_cached_space(state_cache.id(), state_cache.clone_files()); | ||
|
||
// memory mapped store | ||
let mut store = StoreRevMut::new(state_cache.clone()); | ||
|
||
// mutate the in memory buffer. | ||
let data = b"this is a test"; | ||
let hash: [u8; HASH_SIZE] = sha3::Keccak256::digest(data).into(); | ||
store.write(0, &hash); | ||
assert_eq!(store.id(), STATE_SPACE); | ||
|
||
let another_data = b"this is another test"; | ||
let another_hash: [u8; HASH_SIZE] = sha3::Keccak256::digest(another_data).into(); | ||
|
||
// mutate the in memory buffer in another StoreRev new from the above. | ||
let mut another_store = StoreRevMut::new_from_other(&store); | ||
another_store.write(32, &another_hash); | ||
assert_eq!(another_store.id(), STATE_SPACE); | ||
|
||
// wal should have no records. | ||
assert!(disk_requester.collect_ash(1).unwrap().is_empty()); | ||
|
||
// get RO view of the buffer from the beginning. Both stores should have the same view. | ||
let view = store.get_view(0, HASH_SIZE as u64).unwrap(); | ||
assert_eq!(view.as_deref(), hash); | ||
|
||
let view = another_store.get_view(0, HASH_SIZE as u64).unwrap(); | ||
assert_eq!(view.as_deref(), hash); | ||
|
||
// get RO view of the buffer from the second hash. Only the new store shoulde see the value. | ||
let view = another_store.get_view(32, HASH_SIZE as u64).unwrap(); | ||
assert_eq!(view.as_deref(), another_hash); | ||
let empty: [u8; HASH_SIZE] = [0; HASH_SIZE]; | ||
let view = store.get_view(32, HASH_SIZE as u64).unwrap(); | ||
assert_eq!(view.as_deref(), empty); | ||
|
||
// Overwrite the value from the beginning in the new store. Only the new store shoulde see the change. | ||
another_store.write(0, &another_hash); | ||
let view = another_store.get_view(0, HASH_SIZE as u64).unwrap(); | ||
assert_eq!(view.as_deref(), another_hash); | ||
let view = store.get_view(0, HASH_SIZE as u64).unwrap(); | ||
assert_eq!(view.as_deref(), hash); | ||
|
||
// Commit the change. Take the delta from both stores. | ||
let (redo_delta, wal) = store.take_delta(); | ||
assert_eq!(1, redo_delta.0.len()); | ||
assert_eq!(1, wal.undo.len()); | ||
|
||
let (another_redo_delta, another_wal) = another_store.take_delta(); | ||
assert_eq!(1, another_redo_delta.0.len()); | ||
assert_eq!(2, another_wal.undo.len()); | ||
|
||
// Verify after the changes been applied to underlying CachedSpace, | ||
// the newly created stores should see the previous changes. | ||
state_cache.update(&redo_delta).unwrap(); | ||
let store = StoreRevMut::new(state_cache.clone()); | ||
let view = store.get_view(0, HASH_SIZE as u64).unwrap(); | ||
assert_eq!(view.as_deref(), hash); | ||
|
||
state_cache.update(&another_redo_delta).unwrap(); | ||
let another_store = StoreRevMut::new(state_cache.clone()); | ||
let view = another_store.get_view(0, HASH_SIZE as u64).unwrap(); | ||
assert_eq!(view.as_deref(), another_hash); | ||
} | ||
|
||
fn get_file_path(path: &Path, file: &str, line: u32) -> PathBuf { | ||
path.join(format!("{}_{}", file.replace('/', "-"), line)) | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -53,13 +53,13 @@ pub trait MemStoreR: Debug + Send + Sync { | |||||
// Page should be boxed as to not take up so much stack-space | ||||||
type Page = Box<[u8; PAGE_SIZE as usize]>; | ||||||
|
||||||
#[derive(Debug)] | ||||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)] | ||||||
pub struct SpaceWrite { | ||||||
offset: u64, | ||||||
data: Box<[u8]>, | ||||||
} | ||||||
|
||||||
#[derive(Debug, Default)] | ||||||
#[derive(Debug, Default, Clone)] | ||||||
/// In memory representation of Write-ahead log with `undo` and `redo`. | ||||||
pub struct Ash { | ||||||
/// Deltas to undo the changes. | ||||||
|
@@ -432,33 +432,71 @@ struct StoreRevMutDelta { | |||||
plain: Ash, | ||||||
} | ||||||
|
||||||
impl Clone for StoreRevMutDelta { | ||||||
fn clone(&self) -> Self { | ||||||
let mut pages = HashMap::new(); | ||||||
for (pid, page) in self.pages.iter() { | ||||||
let mut data = Box::new([0u8; PAGE_SIZE as usize]); | ||||||
data.copy_from_slice(page.as_ref()); | ||||||
pages.insert(*pid, data); | ||||||
} | ||||||
|
||||||
Self { | ||||||
pages, | ||||||
plain: self.plain.clone(), | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
#[derive(Clone, Debug)] | ||||||
/// A mutable revision of the store. The view is constucted by applying the `deltas` to the | ||||||
/// `base space`. The `deltas` tracks both `undo` and `redo` to be able to rewind or reapply | ||||||
/// the changes. | ||||||
/// the changes. `StoreRevMut` supports basing on top of another `StoreRevMut`, by chaining | ||||||
/// `prev_deltas` (from based `StoreRevMut`) with current `deltas` from itself . In this way, | ||||||
/// callers can create a new `StoreRevMut` from an existing one without actually committing | ||||||
/// the mutations to the base space. | ||||||
pub struct StoreRevMut { | ||||||
base_space: Arc<dyn MemStoreR>, | ||||||
deltas: Arc<RwLock<StoreRevMutDelta>>, | ||||||
prev_deltas: Arc<StoreRevMutDelta>, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you change this to I think this would allow you to remove a lot of the clone implementations too.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, discussed this offline. The reason I did a clone is in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, after debugged together, @rkuris posted the commit to defer the clone during |
||||||
} | ||||||
|
||||||
impl StoreRevMut { | ||||||
pub fn new(base_space: Arc<dyn MemStoreR>) -> Self { | ||||||
Self { | ||||||
base_space, | ||||||
deltas: Default::default(), | ||||||
prev_deltas: Default::default(), | ||||||
} | ||||||
} | ||||||
|
||||||
fn get_page_mut<'a>(&self, deltas: &'a mut StoreRevMutDelta, pid: u64) -> &'a mut [u8] { | ||||||
let page = deltas.pages.entry(pid).or_insert_with(|| { | ||||||
Box::new( | ||||||
self.base_space | ||||||
.get_slice(pid << PAGE_SIZE_NBIT, PAGE_SIZE) | ||||||
.unwrap() | ||||||
.try_into() | ||||||
.unwrap(), | ||||||
) | ||||||
}); | ||||||
pub fn new_from_other(other: &StoreRevMut) -> Self { | ||||||
Self { | ||||||
base_space: other.base_space.clone(), | ||||||
deltas: Default::default(), | ||||||
prev_deltas: Arc::new(other.deltas.read().clone()), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This gets a lot easier with the type change above:
Suggested change
|
||||||
} | ||||||
} | ||||||
|
||||||
fn get_page_mut<'a>( | ||||||
&self, | ||||||
deltas: &'a mut StoreRevMutDelta, | ||||||
prev_deltas: &StoreRevMutDelta, | ||||||
pid: u64, | ||||||
) -> &'a mut [u8] { | ||||||
let page = deltas | ||||||
.pages | ||||||
.entry(pid) | ||||||
.or_insert_with(|| match prev_deltas.pages.get(&pid) { | ||||||
Some(p) => Box::new(*p.as_ref()), | ||||||
None => Box::new( | ||||||
self.base_space | ||||||
.get_slice(pid << PAGE_SIZE_NBIT, PAGE_SIZE) | ||||||
.unwrap() | ||||||
.try_into() | ||||||
.unwrap(), | ||||||
), | ||||||
}); | ||||||
|
||||||
page.as_mut() | ||||||
} | ||||||
|
@@ -544,15 +582,17 @@ impl CachedStore for StoreRevMut { | |||||
|
||||||
if s_pid == e_pid { | ||||||
let mut deltas = self.deltas.write(); | ||||||
let slice = &mut self.get_page_mut(deltas.deref_mut(), s_pid)[s_off..e_off + 1]; | ||||||
let slice = &mut self.get_page_mut(deltas.deref_mut(), &self.prev_deltas, s_pid) | ||||||
[s_off..e_off + 1]; | ||||||
undo.extend(&*slice); | ||||||
slice.copy_from_slice(change) | ||||||
} else { | ||||||
let len = PAGE_SIZE as usize - s_off; | ||||||
|
||||||
{ | ||||||
let mut deltas = self.deltas.write(); | ||||||
let slice = &mut self.get_page_mut(deltas.deref_mut(), s_pid)[s_off..]; | ||||||
let slice = | ||||||
&mut self.get_page_mut(deltas.deref_mut(), &self.prev_deltas, s_pid)[s_off..]; | ||||||
undo.extend(&*slice); | ||||||
slice.copy_from_slice(&change[..len]); | ||||||
} | ||||||
|
@@ -561,13 +601,14 @@ impl CachedStore for StoreRevMut { | |||||
|
||||||
let mut deltas = self.deltas.write(); | ||||||
for p in s_pid + 1..e_pid { | ||||||
let slice = self.get_page_mut(deltas.deref_mut(), p); | ||||||
let slice = self.get_page_mut(deltas.deref_mut(), &self.prev_deltas, p); | ||||||
undo.extend(&*slice); | ||||||
slice.copy_from_slice(&change[..PAGE_SIZE as usize]); | ||||||
change = &change[PAGE_SIZE as usize..]; | ||||||
} | ||||||
|
||||||
let slice = &mut self.get_page_mut(deltas.deref_mut(), e_pid)[..e_off + 1]; | ||||||
let slice = | ||||||
&mut self.get_page_mut(deltas.deref_mut(), &self.prev_deltas, e_pid)[..e_off + 1]; | ||||||
undo.extend(&*slice); | ||||||
slice.copy_from_slice(change); | ||||||
} | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it's doing this already?
nit: I think we have a method somewhere that creates a directory under target, which means
cargo clean
will remove it automatically.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right, I will remove the comment in a follow up PR.