Skip to content

Commit

Permalink
For NodeStoreHeader, use bytemuck not bincode (#723)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkuris authored Oct 8, 2024
1 parent caffeaf commit 46f35a5
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 84 deletions.
29 changes: 17 additions & 12 deletions firewood/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ impl std::fmt::Debug for DbMetrics {

#[async_trait]
impl api::DbView for HistoricalRev {
type Stream<'a> = MerkleKeyValueStream<'a, Self> where Self: 'a;
type Stream<'a>
= MerkleKeyValueStream<'a, Self>
where
Self: 'a;

async fn root_hash(&self) -> Result<Option<api::HashKey>, api::Error> {
HashedNodeReader::root_hash(self).map_err(api::Error::IO)
Expand Down Expand Up @@ -125,7 +128,10 @@ where
{
type Historical = NodeStore<Committed, FileBacked>;

type Proposal<'p> = Proposal<'p> where Self: 'p;
type Proposal<'p>
= Proposal<'p>
where
Self: 'p;

async fn revision(&self, root_hash: TrieHash) -> Result<Arc<Self::Historical>, api::Error> {
let nodestore = self
Expand Down Expand Up @@ -230,7 +236,10 @@ pub struct Proposal<'p> {

#[async_trait]
impl<'a> api::DbView for Proposal<'a> {
type Stream<'b> = MerkleKeyValueStream<'b, NodeStore<Arc<ImmutableProposal>, FileBacked>> where Self: 'b;
type Stream<'b>
= MerkleKeyValueStream<'b, NodeStore<Arc<ImmutableProposal>, FileBacked>>
where
Self: 'b;

async fn root_hash(&self) -> Result<Option<api::HashKey>, api::Error> {
self.nodestore.root_hash().map_err(api::Error::from)
Expand Down Expand Up @@ -313,15 +322,11 @@ impl<'a> api::Proposal for Proposal<'a> {
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod test {
use std::{
ops::{Deref, DerefMut},
path::PathBuf,
};

use crate::{
db::Db,
v2::api::{Db as _, DbView as _, Error, Proposal as _},
};
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;

use crate::db::Db;
use crate::v2::api::{Db as _, DbView as _, Error, Proposal as _};

use super::{BatchOp, DbConfig};

Expand Down
6 changes: 3 additions & 3 deletions firewood/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

#![allow(dead_code)]

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::io::Error;
use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::{collections::VecDeque, io::Error};

use storage::logger::warn;
use typed_builder::TypedBuilder;
Expand Down Expand Up @@ -92,7 +92,7 @@ impl RevisionManager {
}

if truncate {
nodestore.flush_header()?;
nodestore.flush_header_with_padding()?;
}

Ok(manager)
Expand Down
2 changes: 2 additions & 0 deletions storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ arc-swap = "1.7.1"
lru = "0.12.4"
metrics = "0.23.0"
log = { version = "0.4.20", optional = true }
bytemuck = "1.7.0"
bytemuck_derive = "1.7.0"

[dev-dependencies]
rand = "0.8.5"
Expand Down
134 changes: 65 additions & 69 deletions storage/src/nodestore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::logger::trace;
use arc_swap::access::DynAccess;
use arc_swap::ArcSwap;
use bincode::{DefaultOptions, Options as _};
use bytemuck_derive::{AnyBitPattern, NoUninit};
use metrics::{counter, histogram};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand Down Expand Up @@ -100,9 +101,6 @@ const NUM_AREA_SIZES: usize = AREA_SIZES.len();
const MIN_AREA_SIZE: u64 = AREA_SIZES[0];
const MAX_AREA_SIZE: u64 = AREA_SIZES[NUM_AREA_SIZES - 1];

const SOME_FREE_LIST_ELT_SIZE: u64 = 1 + std::mem::size_of::<LinearAddress>() as u64;
const FREE_LIST_MAX_SIZE: u64 = NUM_AREA_SIZES as u64 * SOME_FREE_LIST_ELT_SIZE;

/// Returns the index in `BLOCK_SIZES` of the smallest block size >= `n`.
fn area_size_to_index(n: u64) -> Result<AreaIndex, Error> {
if n > MAX_AREA_SIZE {
Expand Down Expand Up @@ -196,13 +194,25 @@ impl<S: ReadableStorage> NodeStore<Committed, S> {
/// Assumes the header is written in the [ReadableStorage].
pub fn open(storage: Arc<S>) -> Result<Self, Error> {
let mut stream = storage.stream_from(0)?;

let header: NodeStoreHeader = DefaultOptions::new()
.deserialize_from(&mut stream)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
let mut header = NodeStoreHeader::new();
let header_bytes = bytemuck::bytes_of_mut(&mut header);
stream.read_exact(header_bytes)?;

drop(stream);

if header.version != Version::new() {
return Err(Error::new(
ErrorKind::InvalidData,
"Incompatible firewood version",
));
}
if header.endian_test != 1 {
return Err(Error::new(
ErrorKind::InvalidData,
"Database cannot be opened due to difference in endianness",
));
}

let mut nodestore = Self {
header,
kind: Committed {
Expand Down Expand Up @@ -308,7 +318,7 @@ impl<S: ReadableStorage> NodeStore<MutableProposal, S> {
parent: parent.kind.as_nodestore_parent(),
};
Ok(NodeStore {
header: parent.header.clone(),
header: parent.header,
kind,
storage: parent.storage.clone(),
})
Expand All @@ -335,30 +345,14 @@ impl<S: ReadableStorage> NodeStore<MutableProposal, S> {
}
}

impl<S: WritableStorage> NodeStore<ImmutableProposal, S> {
// TODO danlaine: Use this code in the revision management code.
// TODO danlaine: Write only the parts of the header that have changed instead of the whole thing
// fn write_header(&mut self) -> Result<(), Error> {
// let header_bytes = bincode::serialize(&self.header).map_err(|e| {
// Error::new(
// ErrorKind::InvalidData,
// format!("Failed to serialize free lists: {}", e),
// )
// })?;

// self.storage.write(0, header_bytes.as_slice())?;

// Ok(())
// }
}

impl<S: WritableStorage> NodeStore<MutableProposal, S> {
/// Creates a new, empty, [NodeStore] and clobbers the underlying `storage` with an empty header.
/// This is used during testing and during the creation of an in-memory merkle for proofs
pub fn new_empty_proposal(storage: Arc<S>) -> Self {
let header = NodeStoreHeader::new();
let header_bytes = bincode::serialize(&header).expect("failed to serialize header");
let header_bytes = bytemuck::bytes_of(&header);
storage
.write(0, header_bytes.as_slice())
.write(0, header_bytes)
.expect("failed to write header");
NodeStore {
header,
Expand Down Expand Up @@ -524,7 +518,8 @@ impl From<Error> for UpdateError {

/// Can be used by filesystem tooling such as "file" to identify
/// the version of firewood used to create this [NodeStore] file.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, NoUninit, AnyBitPattern)]
#[repr(transparent)]
struct Version {
bytes: [u8; 16],
}
Expand All @@ -549,37 +544,36 @@ pub type FreeLists = [Option<LinearAddress>; NUM_AREA_SIZES];

/// Persisted metadata for a [NodeStore].
/// The [NodeStoreHeader] is at the start of the ReadableStorage.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
#[derive(Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Clone, NoUninit, AnyBitPattern)]
#[repr(C)]
struct NodeStoreHeader {
/// Identifies the version of firewood used to create this [NodeStore].
version: Version,
/// always "1"; verifies endianness
endian_test: u64,
size: u64,
/// Element i is the pointer to the first free block of size `BLOCK_SIZES[i]`.
free_lists: FreeLists,
root_address: Option<LinearAddress>,
}

impl NodeStoreHeader {
/// The first SIZE bytes of the ReadableStorage are the [NodeStoreHeader].
/// The serialized NodeStoreHeader may be less than SIZE bytes but we
/// reserve this much space for it since it can grow and it must always be
/// at the start of the ReadableStorage so it can't be moved in a resize.
const SIZE: u64 = {
// 8 and 9 for `size` and `root_address` respectively
let max_size = Version::SIZE + 8 + 9 + FREE_LIST_MAX_SIZE;
// Round up to the nearest multiple of MIN_AREA_SIZE
let remainder = max_size % MIN_AREA_SIZE;
if remainder == 0 {
max_size
} else {
max_size + MIN_AREA_SIZE - remainder
}
};
/// The first SIZE bytes of the ReadableStorage are reserved for the
/// [NodeStoreHeader].
/// We also want it aligned to a disk block

const SIZE: u64 = 2048;

/// Number of extra bytes to write on the first creation of the NodeStoreHeader
/// (zero-padded)
/// also a compile time check to prevent setting SIZE too small
const EXTRA_BYTES: usize = Self::SIZE as usize - std::mem::size_of::<NodeStoreHeader>();

fn new() -> Self {
Self {
// The store just contains the header at this point
size: Self::SIZE,
endian_test: 1,
root_address: None,
version: Version::new(),
free_lists: Default::default(),
Expand Down Expand Up @@ -872,12 +866,22 @@ impl<S: ReadableStorage> NodeStore<Arc<ImmutableProposal>, S> {
impl<T, S: WritableStorage> NodeStore<T, S> {
/// Persist the header from this proposal to storage.
pub fn flush_header(&self) -> Result<(), Error> {
let header_bytes = DefaultOptions::new()
.with_varint_encoding()
.serialize(&self.header)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
self.storage.write(0, header_bytes.as_slice())?;
let header_bytes = bytemuck::bytes_of(&self.header);
self.storage.write(0, header_bytes)?;
Ok(())
}

/// Persist the header, including all the padding
/// This is only done the first time we write the header
pub fn flush_header_with_padding(&self) -> Result<(), Error> {
let header_bytes = bytemuck::bytes_of(&self.header)
.iter()
.copied()
.chain(std::iter::repeat(0u8).take(NodeStoreHeader::EXTRA_BYTES))
.collect::<Box<[u8]>>();
debug_assert_eq!(header_bytes.len(), NodeStoreHeader::SIZE as usize);

self.storage.write(0, &header_bytes)?;
Ok(())
}
}
Expand All @@ -886,13 +890,9 @@ impl<S: WritableStorage> NodeStore<ImmutableProposal, S> {
/// Persist the freelist from this proposal to storage.
pub fn flush_freelist(&self) -> Result<(), Error> {
// Write the free lists to storage
let free_list_bytes = DefaultOptions::new()
.with_varint_encoding()
.serialize(&self.header.free_lists)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
let free_list_bytes = bytemuck::bytes_of(&self.header.free_lists);
let free_list_offset = offset_of!(NodeStoreHeader, free_lists) as u64;
self.storage
.write(free_list_offset, free_list_bytes.as_slice())?;
self.storage.write(free_list_offset, free_list_bytes)?;
Ok(())
}

Expand Down Expand Up @@ -924,7 +924,7 @@ impl NodeStore<ImmutableProposal, FileBacked> {
/// This function is used during commit.
pub fn as_committed(&self) -> NodeStore<Committed, FileBacked> {
NodeStore {
header: self.header.clone(),
header: self.header,
kind: Committed {
deleted: self.kind.deleted.clone(),
root_hash: self.kind.root_hash.clone(),
Expand All @@ -938,13 +938,9 @@ impl<S: WritableStorage> NodeStore<Arc<ImmutableProposal>, S> {
/// Persist the freelist from this proposal to storage.
pub fn flush_freelist(&self) -> Result<(), Error> {
// Write the free lists to storage
let free_list_bytes = DefaultOptions::new()
.with_varint_encoding()
.serialize(&self.header.free_lists)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
let free_list_bytes = bytemuck::bytes_of(&self.header.free_lists);
let free_list_offset = offset_of!(NodeStoreHeader, free_lists) as u64;
self.storage
.write(free_list_offset, free_list_bytes.as_slice())?;
self.storage.write(free_list_offset, free_list_bytes)?;
Ok(())
}

Expand Down Expand Up @@ -976,7 +972,7 @@ impl NodeStore<Arc<ImmutableProposal>, FileBacked> {
/// This function is used during commit.
pub fn as_committed(&self) -> NodeStore<Committed, FileBacked> {
NodeStore {
header: self.header.clone(),
header: self.header,
kind: Committed {
deleted: self.kind.deleted.clone(),
root_hash: self.kind.root_hash.clone(),
Expand Down Expand Up @@ -1110,7 +1106,8 @@ impl<S: WritableStorage> NodeStore<Committed, S> {
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use crate::{linear::memory::MemStore, BranchNode, LeafNode};
use crate::linear::memory::MemStore;
use crate::{BranchNode, LeafNode};
use arc_swap::access::DynGuard;
use smallvec::SmallVec;
use test_case::test_case;
Expand Down Expand Up @@ -1184,11 +1181,10 @@ mod tests {
let node_store = NodeStore::new_empty_proposal(memstore.into());

// Check the empty header is written at the start of the ReadableStorage.
let mut header_bytes = node_store.storage.stream_from(0).unwrap();
let header: NodeStoreHeader = DefaultOptions::new()
.with_varint_encoding()
.deserialize_from(&mut header_bytes)
.unwrap();
let mut header = NodeStoreHeader::new();
let mut header_stream = node_store.storage.stream_from(0).unwrap();
let header_bytes = bytemuck::bytes_of_mut(&mut header);
header_stream.read_exact(header_bytes).unwrap();
assert_eq!(header.version, Version::new());
let empty_free_list: FreeLists = Default::default();
assert_eq!(header.free_lists, empty_free_list);
Expand Down

0 comments on commit 46f35a5

Please sign in to comment.