Skip to content

Commit

Permalink
fix: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jproyo committed Oct 4, 2024
1 parent 4996820 commit 8e6ef6f
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 67 deletions.
89 changes: 89 additions & 0 deletions lib/b3fs/src/bucket/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use serde::{Deserialize, Deserializer, Serialize};
use tokio::fs::{File, OpenOptions};
use tokio::io::{self, AsyncSeek, AsyncSeekExt, AsyncWriteExt, BufWriter};

use super::errors;
use crate::bucket::Bucket;
use crate::hasher::collector::BufCollector;
use crate::utils::{self, from_hex, to_hex};

pub mod reader;
Expand Down Expand Up @@ -196,3 +198,90 @@ impl HeaderFile {
task.await
}
}

struct FileWriterState {
bucket: Bucket,
temp_file_path: PathBuf,
block_files: Vec<([u8; 32], PathBuf)>,
current_block_file: Option<BlockFile>,
header_file: HeaderFile,
count_block: usize,
}

impl FileWriterState {
async fn new(bucket: &Bucket) -> Result<Self, io::Error> {
let wal_path = bucket.get_new_wal_path();
tokio::fs::create_dir_all(&wal_path).await?;
let header_file = HeaderFile::from_wal_path(&wal_path).await?;
Ok(Self {
bucket: bucket.clone(),
temp_file_path: wal_path,
count_block: 0,
block_files: Vec::new(),
current_block_file: None,
header_file,
})
}

async fn move_to_next_block(
&mut self,
collector: &mut BufCollector,
bytes: &[u8],
) -> Result<(), errors::WriteError> {
if let Some(ref mut block) = self.current_block_file {
// Because we have written `block_before_remaining` bytes to the current block
// file, and we have more bytes than a
// `MAX_BLOCK_SIZE_IN_BYTES`, we know that a new hash has been created in the
// tree. Therefore we can safely get the created block hash
let block_hash = collector
.get_block_hash(self.count_block)
.ok_or(errors::WriteError::BlockHashNotFound)?;

let new_block_file_path = block.commit(&self.temp_file_path, block_hash).await?;

// Add the block hash and the block file path to the list of block files so far
// committed.
self.block_files.push((block_hash, new_block_file_path));
self.count_block += 1;

// Write the block hash to the header file.
collector.write_hash(&mut self.header_file.file).await?;
// Create a new file to write the remaining bytes.
let mut block_file = BlockFile::from_wal_path(&self.temp_file_path).await?;

block_file.write_all(&bytes).await?;
self.current_block_file = Some(block_file);
} else {
let mut block_file = BlockFile::from_wal_path(&self.temp_file_path).await?;
block_file.write_all(&bytes).await?;
self.current_block_file = Some(block_file);
}
Ok(())
}

async fn commit(
mut self,
mut collector: BufCollector,
root_hash: [u8; 32],
) -> Result<[u8; 32], errors::CommitError> {
if let Some(mut block_file) = self.current_block_file {
if let Some(block_hash) = collector.get_block_hash(self.count_block) {
let block_file_path = block_file.commit(&self.temp_file_path, block_hash).await?;
self.block_files.push((block_hash, block_file_path));
self.count_block += 1;
collector.write_hash(&mut self.header_file.file).await?;
}
}

self.header_file
.flush(&self.bucket, &self.block_files, &root_hash)
.await?;

tokio::fs::remove_dir_all(self.temp_file_path).await?;
Ok(root_hash)
}

async fn rollback(self) -> Result<(), io::Error> {
tokio::fs::remove_dir_all(self.temp_file_path).await
}
}
79 changes: 12 additions & 67 deletions lib/b3fs/src/bucket/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,23 @@ use rand::random;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt as _, AsyncWriteExt, BufWriter};

use super::{BlockFile, HeaderFile};
use super::{BlockFile, FileWriterState, HeaderFile};
use crate::bucket::{errors, Bucket};
use crate::hasher::b3::{BLOCK_SIZE_IN_CHUNKS, MAX_BLOCK_SIZE_IN_BYTES};
use crate::hasher::byte_hasher::Blake3Hasher;
use crate::hasher::collector::BufCollector;
use crate::utils::{self, tree_index};

pub struct FileWriter {
bucket: Bucket,
hasher: Blake3Hasher<BufCollector>,
temp_file_path: PathBuf,
block_files: Vec<([u8; 32], PathBuf)>,
current_block_file: Option<BlockFile>,
header_file: HeaderFile,
count_block: usize,
writer_state: FileWriterState,
}

impl FileWriter {
pub async fn new(bucket: &Bucket) -> Result<Self, errors::WriteError> {
let wal_path = bucket.get_new_wal_path();
tokio::fs::create_dir_all(&wal_path).await?;
let header_file = HeaderFile::from_wal_path(&wal_path).await?;
Ok(Self {
bucket: bucket.clone(),
hasher: Blake3Hasher::default(),
temp_file_path: wal_path,
count_block: 0,
block_files: Vec::new(),
current_block_file: None,
header_file,
writer_state: FileWriterState::new(bucket).await?,
})
}

Expand All @@ -49,52 +36,25 @@ impl FileWriter {
let want = cmp::min(BLOCK_SIZE_IN_CHUNKS, bytes_mut.len());
let mut bytes = bytes_mut.split_to(want);
self.hasher.update(&bytes);
if let Some(ref mut block) = self.current_block_file {
if let Some(ref mut block) = self.writer_state.current_block_file {
if block.size + bytes.len() > MAX_BLOCK_SIZE_IN_BYTES {
// Write `block_before_remaining` bytes to the current block file and create a
// ew block file where we will write the remaining bytes which is in `bytes`.
let remaining = MAX_BLOCK_SIZE_IN_BYTES - block.size;
let block_before_remaining = bytes.split_to(remaining);
block.write_all(&block_before_remaining).await?;

// Because we have written `block_before_remaining` bytes to the current block
// file, and we have more bytes than a
// `MAX_BLOCK_SIZE_IN_BYTES`, we know that a new hash has been created in the
// tree. Therefore we can safely get the created block hash
let block_hash = self
.hasher
.get_tree()
.get_block_hash(self.count_block)
.ok_or(errors::WriteError::BlockHashNotFound)?;

let new_block_file_path =
block.commit(&self.temp_file_path, block_hash).await?;

// Add the block hash and the block file path to the list of block files so far
// committed.
self.block_files.push((block_hash, new_block_file_path));
self.count_block += 1;

// Write the block hash to the header file.
self.hasher
.get_tree_mut()
.write_hash(&mut self.header_file.file)
self.writer_state
.move_to_next_block(self.hasher.get_tree_mut(), &bytes)
.await?;
// Create a new file to write the remaining bytes.
let mut block_file = BlockFile::from_wal_path(&self.temp_file_path).await?;

block_file.write_all(&bytes).await?;
self.current_block_file = Some(block_file);
} else {
// Filling the current block file with the bytes. Nothing more to do
block.write_all(&bytes).await?;
}
} else {
// Create a new block file and write the bytes to it, because there is no current
// block
let mut block_file = BlockFile::from_wal_path(&self.temp_file_path).await?;
block_file.write_all(&bytes).await?;
self.current_block_file = Some(block_file);
self.writer_state
.move_to_next_block(self.hasher.get_tree_mut(), &bytes)
.await?;
}
}

Expand All @@ -103,28 +63,13 @@ impl FileWriter {

/// Finalize this write and flush the data to the disk.
pub async fn commit(mut self) -> Result<[u8; 32], errors::CommitError> {
let (ref mut collector, root_hash) = self.hasher.finalize_tree();
if let Some(mut block_file) = self.current_block_file {
if let Some(block_hash) = collector.get_block_hash(self.count_block) {
let block_file_path = block_file.commit(&self.temp_file_path, block_hash).await?;
self.block_files.push((block_hash, block_file_path));
self.count_block += 1;
collector.write_hash(&mut self.header_file.file).await?;
}
}

self.header_file
.flush(&self.bucket, &self.block_files, &root_hash)
.await?;

tokio::fs::remove_dir_all(self.temp_file_path).await?;

Ok(root_hash)
let (mut collector, root_hash) = self.hasher.finalize_tree();
self.writer_state.commit(collector, root_hash).await
}

/// Cancel this write and remove anything that this writer wrote to the disk.
pub async fn rollback(self) -> Result<(), io::Error> {
tokio::fs::remove_dir_all(self.temp_file_path).await
self.writer_state.rollback().await
}
}

Expand Down

0 comments on commit 8e6ef6f

Please sign in to comment.