Skip to content

Commit

Permalink
Merge branch 'main' into rkuris/db-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
rkuris authored Aug 10, 2023
2 parents 83403cf + bd07742 commit 27d2214
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 66 deletions.
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,13 @@ into prometheus metrics or json (it implements [serde::Serialize])
### Seasoned milestone
This milestone will add support for proposals, including proposed future
branches, with a cache to make committing these branches efficiently.
- [ ] Be able to support multiple proposed revisions against latest committed
- [x] Be able to support multiple proposed revisions against latest committed
version.
- [ ] Be able to propose a batch against the existing committed revision, or
- [x] Be able to propose a batch against the existing committed revision, or
propose a batch against any existing proposed revision.
- [ ] Be able to quickly commit a batch that has been proposed. Note that this
invalidates all other proposals that are not children of the committed proposed batch.
- [ ] Add metric reporting
- [ ] Refactor `Shale` to be more idiomatic
- [x] Commit a batch that has been proposed will invalidate all other proposals
that are not children of the committed proposed batch.
- [ ] Be able to quickly commit a batch that has been proposed.

### Dried milestone
The focus of this milestone will be to support synchronization to other
Expand All @@ -129,6 +128,8 @@ verify the correctness of the data.
corresponding range proofs that verify the correctness of the data.
- [ ] Enforce limits on the size of the range proof as well as keys to make
synchronization easier for clients.
- [ ] Add metric reporting
- [ ] Refactor `Shale` to be more idiomatic

## Build
Firewood currently is Linux-only, as it has a dependency on the asynchronous
Expand Down
18 changes: 17 additions & 1 deletion firewood/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,14 @@ impl Db<Store, SharedStore> {
if cfg.truncate {
let _ = std::fs::remove_dir_all(db_path.as_ref());
}
let (db_path, reset) = file::open_dir(db_path, cfg.truncate)?;

let open_options = if cfg.truncate {
file::Options::Truncate
} else {
file::Options::NoTruncate
};

let (db_path, reset) = file::open_dir(db_path, open_options)?;

let merkle_path = file::touch_dir("merkle", &db_path)?;
let merkle_meta_path = file::touch_dir("meta", &merkle_path)?;
Expand Down Expand Up @@ -1122,6 +1129,11 @@ pub enum BatchOp<K> {
/// can be proposed
pub type Batch<K> = Vec<BatchOp<K>>;

/// An atomic batch of changes proposed against the latest committed revision,
/// or any existing [Proposal]. Multiple proposals can be created against the
/// latest committed revision at the same time. [Proposal] is immutable meaning
/// the internal batch cannot be altered after creation. Committing a proposal
/// invalidates all other proposals that are not children of the committed one.
pub struct Proposal<S, T> {
// State of the Db
m: Arc<RwLock<DbInner<S>>>,
Expand All @@ -1142,6 +1154,8 @@ pub enum ProposalBase<S, T> {
}

impl Proposal<Store, SharedStore> {
// Propose a new proposal from this proposal. The new proposal will be
// the child of it.
pub fn propose<K: AsRef<[u8]>>(
self: Arc<Self>,
data: Batch<K>,
Expand Down Expand Up @@ -1196,6 +1210,8 @@ impl Proposal<Store, SharedStore> {
})
}

/// Persist all changes to the DB. The atomicity of the [Proposal] guarantees all changes are
/// either retained on disk or lost together during a crash.
pub fn commit(&self) -> Result<(), DbError> {
let mut committed = self.committed.lock();
if *committed {
Expand Down
37 changes: 22 additions & 15 deletions firewood/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,22 @@ pub struct File {
fd: Fd,
}

#[derive(PartialEq, Eq)]
pub enum Options {
Truncate,
NoTruncate,
}

impl File {
pub fn open_file(rootpath: PathBuf, fname: &str, truncate: bool) -> Result<Fd, std::io::Error> {
pub fn open_file(
rootpath: PathBuf,
fname: &str,
options: Options,
) -> Result<Fd, std::io::Error> {
let mut filepath = rootpath;
filepath.push(fname);
Ok(std::fs::File::options()
.truncate(truncate)
.truncate(options == Options::Truncate)
.read(true)
.write(true)
.mode(0o600)
Expand All @@ -45,7 +55,8 @@ impl File {

pub fn new<P: AsRef<Path>>(fid: u64, _flen: u64, rootdir: P) -> Result<Self, std::io::Error> {
let fname = Self::_get_fname(fid);
let fd = match Self::open_file(rootdir.as_ref().to_path_buf(), &fname, false) {
let fd = match Self::open_file(rootdir.as_ref().to_path_buf(), &fname, Options::NoTruncate)
{
Ok(fd) => fd,
Err(e) => match e.kind() {
ErrorKind::NotFound => Self::create_file(rootdir.as_ref().to_path_buf(), &fname)?,
Expand Down Expand Up @@ -79,22 +90,18 @@ pub fn touch_dir(dirname: &str, rootdir: &Path) -> Result<PathBuf, std::io::Erro

pub fn open_dir<P: AsRef<Path>>(
path: P,
truncate: bool,
options: Options,
) -> Result<(PathBuf, bool), std::io::Error> {
let mut reset_header = truncate;
let truncate = options == Options::Truncate;

if truncate {
let _ = std::fs::remove_dir_all(path.as_ref());
}

match std::fs::create_dir(path.as_ref()) {
Err(e) => {
if truncate || e.kind() != ErrorKind::AlreadyExists {
return Err(e);
}
}
Ok(_) => {
// the DB did not exist
reset_header = true
}
Err(e) if truncate || e.kind() != ErrorKind::AlreadyExists => Err(e),
// the DB already exists
Err(_) => Ok((path.as_ref().to_path_buf(), false)),
Ok(_) => Ok((path.as_ref().to_path_buf(), true)),
}
Ok((PathBuf::from(path.as_ref()), reset_header))
}
6 changes: 3 additions & 3 deletions firewood/src/storage/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ mod tests {

// TODO: Run the test in a separate standalone directory for concurrency reasons
let path = std::path::PathBuf::from(r"/tmp/firewood");
let (root_db_path, reset) = crate::file::open_dir(path, true).unwrap();
let (root_db_path, reset) = file::open_dir(path, file::Options::Truncate).unwrap();

// file descriptor of the state directory
let state_path = file::touch_dir("state", &root_db_path).unwrap();
Expand Down Expand Up @@ -728,7 +728,7 @@ mod tests {
// TODO: Run the test in a separate standalone directory for concurrency reasons
let tmp_dir = TempDir::new("firewood").unwrap();
let path = get_file_path(tmp_dir.path(), file!(), line!());
let (root_db_path, reset) = crate::file::open_dir(path, true).unwrap();
let (root_db_path, reset) = file::open_dir(path, file::Options::Truncate).unwrap();

// file descriptor of the state directory
let state_path = file::touch_dir("state", &root_db_path).unwrap();
Expand Down Expand Up @@ -812,7 +812,7 @@ mod tests {

let tmp_dir = TempDir::new("firewood").unwrap();
let path = get_file_path(tmp_dir.path(), file!(), line!());
let (root_db_path, reset) = crate::file::open_dir(path, true).unwrap();
let (root_db_path, reset) = file::open_dir(path, file::Options::Truncate).unwrap();

// file descriptor of the state directory
let state_path = file::touch_dir("state", &root_db_path).unwrap();
Expand Down
78 changes: 37 additions & 41 deletions firewood/tests/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,15 @@ impl<P: AsRef<Path> + ?Sized> DerefMut for Db<'_, P> {
}
}

macro_rules! assert_val {
($rev: ident, $key:literal, $expected_val:literal) => {
let actual = $rev.kv_get($key.as_bytes()).unwrap();
assert_eq!(actual, $expected_val.as_bytes().to_vec());
};
}

#[test]
fn db_proposal() {
fn db_proposal() -> Result<(), DbError> {
let cfg = DbConfig::builder().wal(WalConfig::builder().max_revisions(10).build());

let db = Db::new("test_db_proposal", &cfg.clone().truncate(true).build())
Expand All @@ -246,52 +253,47 @@ fn db_proposal() {
BatchOp::Delete { key: b"z" },
];

let proposal = Arc::new(db.new_proposal(batch).unwrap());
let proposal = Arc::new(db.new_proposal(batch)?);
let rev = proposal.get_revision();
let val = rev.kv_get(b"k");
assert_eq!(val.unwrap(), "v".as_bytes().to_vec());
assert_val!(rev, "k", "v");

let batch_2 = vec![BatchOp::Put {
key: b"k2",
value: "v2".as_bytes().to_vec(),
}];
let proposal_2 = proposal.clone().propose(batch_2).unwrap();
let proposal_2 = proposal.clone().propose(batch_2)?;
let rev = proposal_2.get_revision();
let val = rev.kv_get(b"k");
assert_eq!(val.unwrap(), "v".as_bytes().to_vec());
let val = rev.kv_get(b"k2");
assert_eq!(val.unwrap(), "v2".as_bytes().to_vec());
assert_val!(rev, "k", "v");
assert_val!(rev, "k2", "v2");

proposal.commit().unwrap();
proposal_2.commit().unwrap();
proposal.commit()?;
proposal_2.commit()?;

std::thread::scope(|scope| {
scope.spawn(|| {
scope.spawn(|| -> Result<(), DbError> {
let another_batch = vec![BatchOp::Put {
key: b"another_k",
value: "another_v".as_bytes().to_vec(),
}];
let another_proposal = proposal.clone().propose(another_batch).unwrap();
let another_proposal = proposal.clone().propose(another_batch)?;
let rev = another_proposal.get_revision();
let val = rev.kv_get(b"k");
assert_eq!(val.unwrap(), "v".as_bytes().to_vec());
let val = rev.kv_get(b"another_k");
assert_eq!(val.unwrap(), "another_v".as_bytes().to_vec());
assert_val!(rev, "k", "v");
assert_val!(rev, "another_k", "another_v");
// The proposal is invalid and cannot be committed
assert!(another_proposal.commit().is_err());
Ok(())
});

scope.spawn(|| {
scope.spawn(|| -> Result<(), DbError> {
let another_batch = vec![BatchOp::Put {
key: b"another_k_1",
value: "another_v_1".as_bytes().to_vec(),
}];
let another_proposal = proposal.clone().propose(another_batch).unwrap();
let another_proposal = proposal.clone().propose(another_batch)?;
let rev = another_proposal.get_revision();
let val = rev.kv_get(b"k");
assert_eq!(val.unwrap(), "v".as_bytes().to_vec());
let val = rev.kv_get(b"another_k_1");
assert_eq!(val.unwrap(), "another_v_1".as_bytes().to_vec());
assert_val!(rev, "k", "v");
assert_val!(rev, "another_k_1", "another_v_1");
Ok(())
});
});

Expand All @@ -301,29 +303,23 @@ fn db_proposal() {
key: b"k3",
value: "v3".as_bytes().to_vec(),
}];
let proposal = Arc::new(db.new_proposal(batch).unwrap());
let proposal = Arc::new(db.new_proposal(batch)?);
let rev = proposal.get_revision();
let val = rev.kv_get(b"k");
assert_eq!(val.unwrap(), "v".as_bytes().to_vec());
let val = rev.kv_get(b"k2");
assert_eq!(val.unwrap(), "v2".as_bytes().to_vec());
let val = rev.kv_get(b"k3");
assert_eq!(val.unwrap(), "v3".as_bytes().to_vec());
assert_val!(rev, "k", "v");
assert_val!(rev, "k2", "v2");
assert_val!(rev, "k3", "v3");

let batch_2 = vec![BatchOp::Put {
key: b"k4",
value: "v4".as_bytes().to_vec(),
}];
let proposal_2 = proposal.clone().propose(batch_2).unwrap();
let proposal_2 = proposal.clone().propose(batch_2)?;
let rev = proposal_2.get_revision();
let val = rev.kv_get(b"k");
assert_eq!(val.unwrap(), "v".as_bytes().to_vec());
let val = rev.kv_get(b"k2");
assert_eq!(val.unwrap(), "v2".as_bytes().to_vec());
let val = rev.kv_get(b"k3");
assert_eq!(val.unwrap(), "v3".as_bytes().to_vec());
let val = rev.kv_get(b"k4");
assert_eq!(val.unwrap(), "v4".as_bytes().to_vec());

proposal_2.commit().unwrap();
assert_val!(rev, "k", "v");
assert_val!(rev, "k2", "v2");
assert_val!(rev, "k3", "v3");
assert_val!(rev, "k4", "v4");

proposal_2.commit()?;
Ok(())
}

0 comments on commit 27d2214

Please sign in to comment.