Skip to content

Commit

Permalink
fix: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jproyo committed Oct 4, 2024
1 parent 8917a12 commit 4996820
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 140 deletions.
153 changes: 112 additions & 41 deletions lib/b3fs/src/bucket/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};

use arrayvec::ArrayString;
use rand::random;
use serde::de::{self, SeqAccess, Visitor};
use serde::{Deserialize, Deserializer, Serialize};
use tokio::fs::File;
use tokio::fs::{File, OpenOptions};
use tokio::io::{self, AsyncSeek, AsyncSeekExt, AsyncWriteExt, BufWriter};

use crate::bucket::Bucket;
use crate::utils::{from_hex, to_hex};
use crate::utils::{self, from_hex, to_hex};

pub mod reader;
//pub mod uwriter;
Expand Down Expand Up @@ -80,48 +81,118 @@ impl B3FSFile {
}
}

pub(crate) async fn move_temp_to_final(
bucket: &Bucket,
header_file: BufWriter<File>,
header_tmp_file_path: &Path,
temp_dir: &Path,
block_files: &[([u8; 32], PathBuf)],
root_hash: &[u8; 32],
) -> Result<(), io::Error> {
let final_path = bucket.get_header_path(root_hash);
let blocks_len = block_files.len() as u32;
let mut file = header_file.into_inner().try_into_std().unwrap();
let task = tokio::task::spawn_blocking(move || async move {
let bytes: [u8; 4] = blocks_len.to_le_bytes();
file.write_at(&bytes, 4)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(()) as Result<(), io::Error>
})
.await?;

task.await?;

tokio::fs::rename(header_tmp_file_path, final_path).await?;

for (i, (hash, path)) in block_files.iter().enumerate() {
let final_hash_file = bucket.get_block_path(i as u32, hash);
tokio::fs::rename(path, final_hash_file).await?;
fn random_wal_file(path: &Path) -> PathBuf {
let random_file: [u8; 32] = random();
path.to_path_buf()
.join(utils::to_hex(&random_file).as_str())
}

struct BlockFile {
size: usize,
path: PathBuf,
file: BufWriter<File>,
}

impl BlockFile {
async fn from_wal_path(path: &Path) -> Result<Self, io::Error> {
let path = random_wal_file(path);
let file = BufWriter::new(
OpenOptions::new()
.create(true)
.append(true)
.open(path.clone())
.await?,
);
Ok(Self {
size: 0,
path,
file,
})
}

tokio::fs::remove_dir_all(temp_dir).await?;
Ok(())
/// Write the bytes to the file and update the size of the file
async fn write_all(&mut self, bytes: &[u8]) -> Result<(), io::Error> {
self.file.write_all(bytes).await?;
self.size += bytes.len();
Ok(())
}

/// Flush the file
async fn flush(&mut self) -> Result<(), io::Error> {
self.file.flush().await
}

/// Commit the block file to the final path
///
/// This mean the following:
/// 1. Current block file is in temporary path "wals" directory with a random name
/// 2. Move that file to the final path "blocks" directory with the block hash as the name
async fn commit(
&mut self,
temp_path: &Path,
block_hash: [u8; 32],
) -> Result<PathBuf, io::Error> {
self.flush().await?;
let new_block_file_path = temp_path.join(utils::to_hex(&block_hash).as_str());
tokio::fs::rename(self.path.clone(), new_block_file_path.clone()).await?;
Ok(new_block_file_path)
}
}

struct HeaderFile {
path: PathBuf,
file: BufWriter<File>,
}

pub(crate) async fn write_block(
temp_file_path: &Path,
block_hash: [u8; 32],
block: &[u8],
) -> Result<(), io::Error> {
let mut temp_hash_file = temp_file_path.to_path_buf();
temp_hash_file.push(to_hex(&block_hash).as_str());
File::create(&temp_hash_file)
.await?
.write_all(block)
impl HeaderFile {
async fn from_wal_path(path: &Path) -> Result<Self, io::Error> {
let path = random_wal_file(path);
let mut file = BufWriter::new(
OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(path.clone())
.await?,
);
let version: u32 = 1;
let num_entries: u32 = 0;
file.write_u32_le(version).await?;
file.write_u32_le(num_entries).await?;

Ok(Self { path, file })
}

async fn flush(
mut self,
bucket: &Bucket,
block_files: &[([u8; 32], PathBuf)],
root_hash: &[u8; 32],
) -> Result<(), io::Error> {
let final_path = bucket.get_header_path(root_hash);
let header_tmp_file_path = self.path.clone();
self.update_num_entries(block_files.len() as u32).await?;

tokio::fs::rename(header_tmp_file_path, final_path).await?;

for (i, (hash, path)) in block_files.iter().enumerate() {
let final_hash_file = bucket.get_block_path(i as u32, hash);
tokio::fs::rename(path, final_hash_file).await?;
}

Ok(())
}

async fn update_num_entries(self, num_entries: u32) -> Result<(), io::Error> {
let mut file = self.file.into_inner().try_into_std().unwrap();
let task = tokio::task::spawn_blocking(move || async move {
let bytes: [u8; 4] = num_entries.to_le_bytes();
file.write_at(&bytes, 4)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(()) as Result<(), io::Error>
})
.await?;
Ok(())

task.await
}
}
130 changes: 31 additions & 99 deletions lib/b3fs/src/bucket/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,65 +7,13 @@ use rand::random;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt as _, AsyncWriteExt, BufWriter};

use super::{move_temp_to_final, write_block};
use super::{BlockFile, HeaderFile};
use crate::bucket::{errors, Bucket};
use crate::hasher::b3::{BLOCK_SIZE_IN_CHUNKS, MAX_BLOCK_SIZE_IN_BYTES};
use crate::hasher::byte_hasher::Blake3Hasher;
use crate::hasher::collector::BufCollector;
use crate::utils::{self, tree_index};

struct BlockFile {
size: usize,
path: PathBuf,
file: BufWriter<File>,
}

impl BlockFile {
async fn new(path: PathBuf) -> Result<Self, io::Error> {
Ok(Self {
size: 0,
path: path.clone(),
file: BufWriter::new(
OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await?,
),
})
}

async fn write_all(&mut self, bytes: &[u8]) -> Result<(), io::Error> {
self.file.write_all(bytes).await?;
self.size += bytes.len();
Ok(())
}

async fn flush(&mut self) -> Result<(), io::Error> {
self.file.flush().await
}
}

struct HeaderFile {
path: PathBuf,
file: BufWriter<File>,
}

impl HeaderFile {
async fn new(path: PathBuf) -> Result<Self, io::Error> {
Ok(Self {
path: path.clone(),
file: BufWriter::new(
OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await?,
),
})
}
}

pub struct FileWriter {
bucket: Bucket,
hasher: Blake3Hasher<BufCollector>,
Expand All @@ -78,20 +26,13 @@ pub struct FileWriter {

impl FileWriter {
pub async fn new(bucket: &Bucket) -> Result<Self, errors::WriteError> {
let mut wal_path = bucket.get_new_wal_path().to_path_buf();
let temp_file_path = wal_path.clone();
let wal_path = bucket.get_new_wal_path();
tokio::fs::create_dir_all(&wal_path).await?;
let random_file: [u8; 32] = random();
wal_path.push(utils::to_hex(&random_file).as_str());
let mut header_file = HeaderFile::new(wal_path.clone()).await?;
let version: u32 = 1;
let num_entries: u32 = 0;
header_file.file.write_u32_le(version).await?;
header_file.file.write_u32_le(num_entries).await?;
let header_file = HeaderFile::from_wal_path(&wal_path).await?;
Ok(Self {
bucket: bucket.clone(),
hasher: Blake3Hasher::default(),
temp_file_path,
temp_file_path: wal_path,
count_block: 0,
block_files: Vec::new(),
current_block_file: None,
Expand All @@ -110,49 +51,48 @@ impl FileWriter {
self.hasher.update(&bytes);
if let Some(ref mut block) = self.current_block_file {
if block.size + bytes.len() > MAX_BLOCK_SIZE_IN_BYTES {
// Write `block_before_remaining` bytes to the current block file and create a
// ew block file where we will write the remaining bytes which is in `bytes`.
let remaining = MAX_BLOCK_SIZE_IN_BYTES - block.size;
let block_remaining = bytes.split_to(remaining);
block.write_all(&block_remaining).await?;
block.flush().await?;
let block_before_remaining = bytes.split_to(remaining);
block.write_all(&block_before_remaining).await?;

// Because we have written `block_before_remaining` bytes to the current block
// file, and we have more bytes than a
// `MAX_BLOCK_SIZE_IN_BYTES`, we know that a new hash has been created in the
// tree. Therefore we can safely get the created block hash
let block_hash = self
.hasher
.get_tree()
.get_block_hash(self.count_block)
.ok_or(errors::WriteError::BlockHashNotFound)?;

let new_block_file_path = self
.temp_file_path
.join(utils::to_hex(&block_hash).as_str());
let new_block_file_path =
block.commit(&self.temp_file_path, block_hash).await?;

tokio::fs::rename(block.path.clone(), new_block_file_path.clone()).await?;
// Add the block hash and the block file path to the list of block files so far
// committed.
self.block_files.push((block_hash, new_block_file_path));

self.count_block += 1;
// write the block to the header file

// Write the block hash to the header file.
self.hasher
.get_tree_mut()
.write_hash(&mut self.header_file.file)
.await?;
// Create a new file to write the remaining bytes.
let block_file_name: [u8; 32] = random();
let block_file_path = self
.temp_file_path
.join(utils::to_hex(&block_file_name).as_str());

let mut block_file = BlockFile::new(block_file_path.clone()).await?;
let mut block_file = BlockFile::from_wal_path(&self.temp_file_path).await?;

block_file.write_all(&bytes).await?;
self.current_block_file = Some(block_file);
} else {
// Filling the current block file with the bytes. Nothing more to do
block.write_all(&bytes).await?;
}
} else {
let block_file_name: [u8; 32] = random();
let block_file_path = self
.temp_file_path
.join(utils::to_hex(&block_file_name).as_str());
let mut block_file = BlockFile::new(block_file_path.clone()).await?;
// Create a new block file and write the bytes to it, because there is no current
// block
let mut block_file = BlockFile::from_wal_path(&self.temp_file_path).await?;
block_file.write_all(&bytes).await?;
self.current_block_file = Some(block_file);
}
Expand All @@ -164,29 +104,21 @@ impl FileWriter {
/// Finalize this write and flush the data to the disk.
pub async fn commit(mut self) -> Result<[u8; 32], errors::CommitError> {
let (ref mut collector, root_hash) = self.hasher.finalize_tree();
if let Some(ref mut block_file) = self.current_block_file {
if let Some(mut block_file) = self.current_block_file {
if let Some(block_hash) = collector.get_block_hash(self.count_block) {
block_file.flush().await?;
let block_file_path = self
.temp_file_path
.join(utils::to_hex(&block_hash).as_str());
tokio::fs::rename(block_file.path.clone(), block_file_path.clone()).await?;
let block_file_path = block_file.commit(&self.temp_file_path, block_hash).await?;
self.block_files.push((block_hash, block_file_path));
self.count_block += 1;
// write the block to the header file
collector.write_hash(&mut self.header_file.file).await?;
}
}

move_temp_to_final(
&self.bucket,
self.header_file.file,
&self.header_file.path,
&self.temp_file_path,
&self.block_files,
&root_hash,
)
.await?;
self.header_file
.flush(&self.bucket, &self.block_files, &root_hash)
.await?;

tokio::fs::remove_dir_all(self.temp_file_path).await?;

Ok(root_hash)
}

Expand Down

0 comments on commit 4996820

Please sign in to comment.