diff --git a/Cargo.lock b/Cargo.lock index 37fcc3462..e55b2365b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2151,6 +2151,7 @@ dependencies = [ "draco-interfaces", "parking_lot", "serde", + "tempdir", "thiserror", "tokio", ] @@ -2861,6 +2862,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "funty" version = "1.1.0" @@ -5532,6 +5539,19 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "643f8f41a8ebc4c5dc4515c82bb8abd397b527fc20fd681b7c011c2aee5d44fb" +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi", +] + [[package]] name = "rand" version = "0.7.3" @@ -5576,6 +5596,21 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + [[package]] name = "rand_core" version = "0.5.1" @@ -5681,6 +5716,15 @@ dependencies = [ "yasna", ] +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + [[package]] name = "readonly" version = "0.2.8" @@ -5791,6 +5835,15 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi", +] + [[package]] name = "reqwest" version = "0.11.18" @@ -6877,6 +6930,16 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "tempdir" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" +dependencies = [ + "rand 0.4.6", + "remove_dir_all", +] + [[package]] name = "tempfile" version = "3.6.0" diff --git a/Cargo.toml b/Cargo.toml index f42895ed5..b96e1c9f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ serde = { version = "1.0", features = ["derive"] } serde-big-array = "0.5.1" serde_json = "1.0.96" thiserror = "1.0" +tempdir = "0.3" parking_lot = "0.12.1" tokio = { version = "1.28", features = ["full"] } tokio-stream = "0.1" diff --git a/core/blockstore/Cargo.toml b/core/blockstore/Cargo.toml index 83c4537a1..7f499b78c 100644 --- a/core/blockstore/Cargo.toml +++ b/core/blockstore/Cargo.toml @@ -15,4 +15,5 @@ bytes.workspace = true parking_lot.workspace = true serde.workspace = true thiserror.workspace = true +tempdir.workspace = true tokio.workspace = true diff --git a/core/blockstore/src/fs.rs b/core/blockstore/src/fs.rs new file mode 100644 index 000000000..0f120ae7e --- /dev/null +++ b/core/blockstore/src/fs.rs @@ -0,0 +1,113 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use draco_interfaces::{ + Blake3Hash, Blake3Tree, BlockStoreInterface, CompressionAlgoSet, CompressionAlgorithm, + ConfigConsumer, ContentChunk, +}; +use serde::{Deserialize, Serialize}; +use tempdir::TempDir; +use tokio::{ + fs::{self, File}, + io::AsyncWriteExt, +}; + +use crate::{put::IncrementalPut, store::Store, Block, BlockContent, Key}; + +const TMP_DIR_PREFIX: &str = "tmp-store"; + +#[derive(Serialize, Deserialize, Default)] +pub struct FsStoreConfig { + store_dir_path: String, +} + +#[derive(Clone)] +pub struct FsStore { + store_dir_path: String, + tmp_dir: Arc, +} + +impl ConfigConsumer for FsStore { + const KEY: &'static str = "fsstore"; + type Config = FsStoreConfig; +} + +#[async_trait] +impl BlockStoreInterface for FsStore { + type SharedPointer = Arc; + type Put = IncrementalPut; + + async fn init(config: Self::Config) -> anyhow::Result { + Ok(Self { + store_dir_path: config.store_dir_path, + tmp_dir: TempDir::new(TMP_DIR_PREFIX).map(Arc::new)?, + }) + } + + async fn get_tree(&self, cid: &Blake3Hash) -> Option> { + match bincode::deserialize::( + self.fetch(&Key::tree_key(*cid)).await?.as_slice(), + ) + .expect("Stored content to be serialized properly") + { + BlockContent::Tree(tree) => Some(Arc::new(Blake3Tree(tree))), + _ => None, + } + } + + async fn get( + &self, + block_counter: u32, + block_hash: &Blake3Hash, + _compression: CompressionAlgoSet, + ) -> Option> { + match bincode::deserialize::( + self.fetch(&Key::chunk_key(*block_hash, block_counter)) + .await? + .as_slice(), + ) + .expect("Stored content to be serialized properly") + { + BlockContent::Chunk(content) => Some(Arc::new(ContentChunk { + compression: CompressionAlgorithm::Uncompressed, + content, + })), + _ => None, + } + } + + fn put(&self, root: Option) -> Self::Put { + match root { + Some(root) => IncrementalPut::verifier(self.clone(), root), + None => IncrementalPut::trust(self.clone()), + } + } +} + +// TODO: Add logging. +#[async_trait] +impl Store for FsStore { + async fn fetch(&self, key: &Key) -> Option { + let path = format!("{}/{:?}", self.store_dir_path, key.0); + fs::read(path).await.ok() + } + + // TODO: This should perhaps return an error. + async fn insert(&mut self, key: Key, block: Block) { + let filename = format!("{:?}", key.0); + let path = self.tmp_dir.path().join(filename); + if let Ok(mut tmp_file) = File::create(&path).await { + if tmp_file.write_all(block.as_ref()).await.is_err() { + return; + } + // TODO: Is this needed before calling rename? + if tmp_file.sync_all().await.is_err() { + return; + } + let store_path = format!("{}/{:?}", self.store_dir_path, key.0); + if fs::rename(path, store_path).await.is_err() { + return; + } + } + } +} diff --git a/core/blockstore/src/lib.rs b/core/blockstore/src/lib.rs index 0eff453c2..d48584b79 100644 --- a/core/blockstore/src/lib.rs +++ b/core/blockstore/src/lib.rs @@ -1,4 +1,5 @@ pub mod config; +mod fs; pub mod memory; pub mod put; mod store; diff --git a/core/blockstore/src/memory.rs b/core/blockstore/src/memory.rs index f994a849e..7725c2968 100644 --- a/core/blockstore/src/memory.rs +++ b/core/blockstore/src/memory.rs @@ -31,8 +31,10 @@ impl BlockStoreInterface for MemoryBlockStore { } async fn get_tree(&self, cid: &Blake3Hash) -> Option> { - match bincode::deserialize::(self.store_get(&Key::tree_key(*cid))?.as_slice()) - .expect("Stored content to be serialized properly") + match bincode::deserialize::( + self.fetch(&Key::tree_key(*cid)).await?.as_slice(), + ) + .expect("Stored content to be serialized properly") { BlockContent::Tree(tree) => Some(Arc::new(Blake3Tree(tree))), _ => None, @@ -46,7 +48,8 @@ impl BlockStoreInterface for MemoryBlockStore { _compression: CompressionAlgoSet, ) -> Option> { match bincode::deserialize::( - self.store_get(&Key::chunk_key(*block_hash, block_counter))? + self.fetch(&Key::chunk_key(*block_hash, block_counter)) + .await? .as_slice(), ) .expect("Stored content to be serialized properly") @@ -67,12 +70,13 @@ impl BlockStoreInterface for MemoryBlockStore { } } +#[async_trait] impl Store for MemoryBlockStore { - fn store_get(&self, key: &Key) -> Option { + async fn fetch(&self, key: &Key) -> Option { self.inner.read().get(key).cloned() } - fn store_put(&mut self, key: Key, block: Block) { + async fn insert(&mut self, key: Key, block: Block) { self.inner.write().insert(key, block); } } diff --git a/core/blockstore/src/put.rs b/core/blockstore/src/put.rs index fe8fed4a9..b3030bb1a 100644 --- a/core/blockstore/src/put.rs +++ b/core/blockstore/src/put.rs @@ -210,7 +210,8 @@ where let block = bincode::serialize(&BlockContent::Chunk(chunk.content.content)) .map_err(|_| PutFinalizeError::PartialContent)?; self.store - .store_put(Key::chunk_key(chunk.hash, count as u32), block); + .insert(Key::chunk_key(chunk.hash, count as u32), block) + .await; } match self.mode { @@ -219,11 +220,13 @@ where let hash_tree = tree_builder.finalize(); let block = bincode::serialize(&BlockContent::Tree(hash_tree.tree)) .map_err(|_| PutFinalizeError::PartialContent)?; - self.store.store_put( - Key::tree_key(Blake3Hash::from(hash_tree.hash)), - // TODO: We need a more descriptive error for serialization-related errors. - block, - ); + self.store + .insert( + Key::tree_key(Blake3Hash::from(hash_tree.hash)), + // TODO: We need a more descriptive error for serialization-related errors. + block, + ) + .await; Ok(Blake3Hash::from(hash_tree.hash)) }, } diff --git a/core/blockstore/src/store.rs b/core/blockstore/src/store.rs index 299789258..3c6c595a0 100644 --- a/core/blockstore/src/store.rs +++ b/core/blockstore/src/store.rs @@ -1,7 +1,10 @@ +use async_trait::async_trait; + use crate::{Block, Key}; -/// Basic get and put store trait. +/// Simple block store interface. +#[async_trait] pub trait Store { - fn store_get(&self, key: &Key) -> Option; - fn store_put(&mut self, key: Key, block: Block); + async fn fetch(&self, key: &Key) -> Option; + async fn insert(&mut self, key: Key, block: Block); }