Skip to content

Commit

Permalink
Add fs store block
Browse files Browse the repository at this point in the history
  • Loading branch information
kckeiks committed Jul 18, 2023
1 parent ed5cac9 commit aa5d1d0
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 14 deletions.
63 changes: 63 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2151,6 +2151,7 @@ dependencies = [
"draco-interfaces",
"parking_lot",
"serde",
"tempdir",
"thiserror",
"tokio",
]
Expand Down Expand Up @@ -2861,6 +2862,12 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"

[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"

[[package]]
name = "funty"
version = "1.1.0"
Expand Down Expand Up @@ -5532,6 +5539,19 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "643f8f41a8ebc4c5dc4515c82bb8abd397b527fc20fd681b7c011c2aee5d44fb"

[[package]]
name = "rand"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293"
dependencies = [
"fuchsia-cprng",
"libc",
"rand_core 0.3.1",
"rdrand",
"winapi",
]

[[package]]
name = "rand"
version = "0.7.3"
Expand Down Expand Up @@ -5576,6 +5596,21 @@ dependencies = [
"rand_core 0.6.4",
]

[[package]]
name = "rand_core"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
dependencies = [
"rand_core 0.4.2",
]

[[package]]
name = "rand_core"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"

[[package]]
name = "rand_core"
version = "0.5.1"
Expand Down Expand Up @@ -5681,6 +5716,15 @@ dependencies = [
"yasna",
]

[[package]]
name = "rdrand"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
dependencies = [
"rand_core 0.3.1",
]

[[package]]
name = "readonly"
version = "0.2.8"
Expand Down Expand Up @@ -5791,6 +5835,15 @@ version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"

[[package]]
name = "remove_dir_all"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
dependencies = [
"winapi",
]

[[package]]
name = "reqwest"
version = "0.11.18"
Expand Down Expand Up @@ -6877,6 +6930,16 @@ dependencies = [
"workspace-hack",
]

[[package]]
name = "tempdir"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8"
dependencies = [
"rand 0.4.6",
"remove_dir_all",
]

[[package]]
name = "tempfile"
version = "3.6.0"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ serde = { version = "1.0", features = ["derive"] }
serde-big-array = "0.5.1"
serde_json = "1.0.96"
thiserror = "1.0"
tempdir = "0.3"
parking_lot = "0.12.1"
tokio = { version = "1.28", features = ["full"] }
tokio-stream = "0.1"
Expand Down
1 change: 1 addition & 0 deletions core/blockstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ bytes.workspace = true
parking_lot.workspace = true
serde.workspace = true
thiserror.workspace = true
tempdir.workspace = true
tokio.workspace = true
113 changes: 113 additions & 0 deletions core/blockstore/src/fs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use std::sync::Arc;

use async_trait::async_trait;
use draco_interfaces::{
Blake3Hash, Blake3Tree, BlockStoreInterface, CompressionAlgoSet, CompressionAlgorithm,
ConfigConsumer, ContentChunk,
};
use serde::{Deserialize, Serialize};
use tempdir::TempDir;
use tokio::{
fs::{self, File},
io::AsyncWriteExt,
};

use crate::{put::IncrementalPut, store::Store, Block, BlockContent, Key};

const TMP_DIR_PREFIX: &str = "tmp-store";

#[derive(Serialize, Deserialize, Default)]
pub struct FsStoreConfig {
store_dir_path: String,
}

#[derive(Clone)]
pub struct FsStore {
store_dir_path: String,
tmp_dir: Arc<TempDir>,
}

impl ConfigConsumer for FsStore {
const KEY: &'static str = "fsstore";
type Config = FsStoreConfig;
}

#[async_trait]
impl BlockStoreInterface for FsStore {
type SharedPointer<T: ?Sized + Send + Sync> = Arc<T>;
type Put = IncrementalPut<Self>;

async fn init(config: Self::Config) -> anyhow::Result<Self> {
Ok(Self {
store_dir_path: config.store_dir_path,
tmp_dir: TempDir::new(TMP_DIR_PREFIX).map(Arc::new)?,
})
}

async fn get_tree(&self, cid: &Blake3Hash) -> Option<Self::SharedPointer<Blake3Tree>> {
match bincode::deserialize::<BlockContent>(
self.fetch(&Key::tree_key(*cid)).await?.as_slice(),
)
.expect("Stored content to be serialized properly")
{
BlockContent::Tree(tree) => Some(Arc::new(Blake3Tree(tree))),
_ => None,
}
}

async fn get(
&self,
block_counter: u32,
block_hash: &Blake3Hash,
_compression: CompressionAlgoSet,
) -> Option<Self::SharedPointer<ContentChunk>> {
match bincode::deserialize::<BlockContent>(
self.fetch(&Key::chunk_key(*block_hash, block_counter))
.await?
.as_slice(),
)
.expect("Stored content to be serialized properly")
{
BlockContent::Chunk(content) => Some(Arc::new(ContentChunk {
compression: CompressionAlgorithm::Uncompressed,
content,
})),
_ => None,
}
}

fn put(&self, root: Option<Blake3Hash>) -> Self::Put {
match root {
Some(root) => IncrementalPut::verifier(self.clone(), root),
None => IncrementalPut::trust(self.clone()),
}
}
}

// TODO: Add logging.
#[async_trait]
impl Store for FsStore {
async fn fetch(&self, key: &Key) -> Option<Block> {
let path = format!("{}/{:?}", self.store_dir_path, key.0);
fs::read(path).await.ok()
}

// TODO: This should perhaps return an error.
async fn insert(&mut self, key: Key, block: Block) {
let filename = format!("{:?}", key.0);
let path = self.tmp_dir.path().join(filename);
if let Ok(mut tmp_file) = File::create(&path).await {
if tmp_file.write_all(block.as_ref()).await.is_err() {
return;
}
// TODO: Is this needed before calling rename?
if tmp_file.sync_all().await.is_err() {
return;
}
let store_path = format!("{}/{:?}", self.store_dir_path, key.0);
if fs::rename(path, store_path).await.is_err() {
return;
}
}
}
}
1 change: 1 addition & 0 deletions core/blockstore/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod config;
mod fs;
pub mod memory;
pub mod put;
mod store;
Expand Down
14 changes: 9 additions & 5 deletions core/blockstore/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ impl BlockStoreInterface for MemoryBlockStore {
}

async fn get_tree(&self, cid: &Blake3Hash) -> Option<Self::SharedPointer<Blake3Tree>> {
match bincode::deserialize::<BlockContent>(self.store_get(&Key::tree_key(*cid))?.as_slice())
.expect("Stored content to be serialized properly")
match bincode::deserialize::<BlockContent>(
self.fetch(&Key::tree_key(*cid)).await?.as_slice(),
)
.expect("Stored content to be serialized properly")
{
BlockContent::Tree(tree) => Some(Arc::new(Blake3Tree(tree))),
_ => None,
Expand All @@ -46,7 +48,8 @@ impl BlockStoreInterface for MemoryBlockStore {
_compression: CompressionAlgoSet,
) -> Option<Self::SharedPointer<ContentChunk>> {
match bincode::deserialize::<BlockContent>(
self.store_get(&Key::chunk_key(*block_hash, block_counter))?
self.fetch(&Key::chunk_key(*block_hash, block_counter))
.await?
.as_slice(),
)
.expect("Stored content to be serialized properly")
Expand All @@ -67,12 +70,13 @@ impl BlockStoreInterface for MemoryBlockStore {
}
}

#[async_trait]
impl Store for MemoryBlockStore {
fn store_get(&self, key: &Key) -> Option<Block> {
async fn fetch(&self, key: &Key) -> Option<Block> {
self.inner.read().get(key).cloned()
}

fn store_put(&mut self, key: Key, block: Block) {
async fn insert(&mut self, key: Key, block: Block) {
self.inner.write().insert(key, block);
}
}
15 changes: 9 additions & 6 deletions core/blockstore/src/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ where
let block = bincode::serialize(&BlockContent::Chunk(chunk.content.content))
.map_err(|_| PutFinalizeError::PartialContent)?;
self.store
.store_put(Key::chunk_key(chunk.hash, count as u32), block);
.insert(Key::chunk_key(chunk.hash, count as u32), block)
.await;
}

match self.mode {
Expand All @@ -219,11 +220,13 @@ where
let hash_tree = tree_builder.finalize();
let block = bincode::serialize(&BlockContent::Tree(hash_tree.tree))
.map_err(|_| PutFinalizeError::PartialContent)?;
self.store.store_put(
Key::tree_key(Blake3Hash::from(hash_tree.hash)),
// TODO: We need a more descriptive error for serialization-related errors.
block,
);
self.store
.insert(
Key::tree_key(Blake3Hash::from(hash_tree.hash)),
// TODO: We need a more descriptive error for serialization-related errors.
block,
)
.await;
Ok(Blake3Hash::from(hash_tree.hash))
},
}
Expand Down
9 changes: 6 additions & 3 deletions core/blockstore/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use async_trait::async_trait;

use crate::{Block, Key};

/// Basic get and put store trait.
/// Simple block store interface.
#[async_trait]
pub trait Store {
fn store_get(&self, key: &Key) -> Option<Block>;
fn store_put(&mut self, key: Key, block: Block);
async fn fetch(&self, key: &Key) -> Option<Block>;
async fn insert(&mut self, key: Key, block: Block);
}

0 comments on commit aa5d1d0

Please sign in to comment.