diff --git a/lib/storage/src/entry/builder.rs b/lib/storage/src/entry/builder.rs deleted file mode 100644 index caf59dd..0000000 --- a/lib/storage/src/entry/builder.rs +++ /dev/null @@ -1,73 +0,0 @@ -use super::impls_v1::EntryV1; -use super::Attr; -use super::Entry; -use super::Header; -use super::Magic; -use bytes::Bytes; - -/// The `EntryBuilder` struct provides a way to construct a new `Entry`. -#[derive(Default)] -pub struct Builder { - log_id: Option, - entry_id: Option, - attr: Attr, - last_confirm: Option, - kv: Option
, - headers: Vec
, -} - -impl Builder { - // Method to set the attr of the EntryBuilder - pub fn attr(mut self, attr: Attr) -> Self { - self.attr = attr; - self - } - - // Method to set the log_id of the EntryBuilder - pub fn log_id(mut self, log_id: i64) -> Self { - self.log_id = Some(log_id); - self - } - - // Method to set the entry_id of the EntryBuilder - pub fn entry_id(mut self, entry_id: i64) -> Self { - self.entry_id = Some(entry_id); - self - } - - // Method to set the last_confirm of the EntryBuilder - pub fn last_confirm(mut self, last_confirm: i64) -> Self { - self.last_confirm = Some(last_confirm); - self - } - - // Method to set the kv of the EntryBuilder - pub fn kv(mut self, key: Bytes, value: Bytes) -> Self { - self.kv = Some(Header::new(key, value)); - self - } - - // Method to set the header of the EntryBuilder - pub fn header(mut self, header: Header) -> Self { - self.headers.push(header); - self - } - - // Method to build the Entry - pub fn build(self, magic: Magic) -> impl Entry { - match magic { - Magic::V1 => self.build_v1(), - } - } - - fn build_v1(self) -> impl Entry { - EntryV1 { - log_id: self.log_id.unwrap(), - entry_id: self.entry_id.unwrap(), - attr: self.attr, - last_confirm: self.last_confirm.unwrap(), - kv: self.kv.unwrap(), - headers: self.headers, - } - } -} diff --git a/lib/storage/src/entry/header.rs b/lib/storage/src/entry/header.rs index 471b923..a4a8de3 100644 --- a/lib/storage/src/entry/header.rs +++ b/lib/storage/src/entry/header.rs @@ -170,4 +170,31 @@ mod tests { assert_eq!(n, header.binary_size()); assert_eq!(buf, b"\x03keyvalue"); } + + #[test] + fn test_read_at_all() { + for i in 1..10 { + read_all(i); + } + } + + fn read_all(step: usize) { + let key = Bytes::from_static(b"key"); + let value = Bytes::from_static(b"value"); + let header = Header::new(key.clone(), value.clone()); + + let mut buf = vec![0; step]; + let mut all = vec![]; + let mut offset = 0; + loop { + let n = header.read_at(&mut buf, offset); + offset += n; + all.extend_from_slice(&buf[..n]); + if n == 0 { + break; + } + } + assert_eq!(all, b"\x03keyvalue"); + assert_eq!(offset, header.binary_size()); + } } diff --git a/lib/storage/src/entry/impls_v1.rs b/lib/storage/src/entry/impls_v1.rs index c670055..a0a51ce 100644 --- a/lib/storage/src/entry/impls_v1.rs +++ b/lib/storage/src/entry/impls_v1.rs @@ -7,6 +7,97 @@ use super::Magic; use super::Result; use bytes::{Buf, BufMut, Bytes}; +// Magic 1 +// Attr 4 +// log_id 8 +// entry_id 8 +// last_confirm_id 8 = 29 +const COMMON_HEADER_BINARY_SIZE: usize = 29; +const COMMON_HEADER_MAGIC_OFFSET: usize = 0; +const COMMON_HEADER_ATTR_OFFSET: usize = 1; +const COMMON_HEADER_LOG_ID_OFFSET: usize = 5; +const COMMON_HEADER_ENTRY_ID_OFFSET: usize = 13; +const COMMON_HEADER_LAC_ID_OFFSET: usize = 21; + +/// The `EntryBuilder` struct provides a way to construct a new `Entry`. +pub struct BuilderV1 { + common_header: [u8; COMMON_HEADER_BINARY_SIZE], + kv: Option
, + headers: Vec
, +} + +impl BuilderV1 { + /// Constructor for BuilderV1 + pub fn new() -> Self { + let mut b = BuilderV1 { + common_header: [0; COMMON_HEADER_BINARY_SIZE], + kv: None, + headers: Vec::new(), + }; + b.common_header[COMMON_HEADER_MAGIC_OFFSET] = Magic::V1.into(); + b + } + + /// Method to set the attr of the Entry + pub fn attr(mut self, attr: Attr) -> Self { + self.put_i32_to_common_header(COMMON_HEADER_ATTR_OFFSET, attr.into()); + self + } + + /// Method to set the log_id of the Entry + pub fn log_id(mut self, log_id: i64) -> Self { + self.put_i64_to_common_header(COMMON_HEADER_LOG_ID_OFFSET, log_id); + self + } + + /// Method to set the entry_id of the Entry + pub fn entry_id(mut self, entry_id: i64) -> Self { + self.put_i64_to_common_header(COMMON_HEADER_ENTRY_ID_OFFSET, entry_id); + self + } + + /// Method to set the last_confirm id of the Entry + pub fn last_confirm_id(mut self, last_confirm_id: i64) -> Self { + self.put_i64_to_common_header(COMMON_HEADER_LAC_ID_OFFSET, last_confirm_id); + self + } + + /// Method to set the kv of the EntryBuilder + pub fn kv(mut self, key: Bytes, value: Bytes) -> Self { + self.kv = Some(Header::new(key, value)); + self + } + + /// Method to set the header of the EntryBuilder + pub fn header(mut self, header: Header) -> Self { + self.headers.push(header); + self + } + + /// Method to build the Entry + pub fn build(self) -> impl Entry { + EntryV1 { + common_header: self.common_header, + headers: self.headers, + kv: self.kv.expect("missing kv field in entry"), + } + } + + fn put_i64_to_common_header(&mut self, offset: usize, value: i64) { + copy_slice( + &value.to_le_bytes(), + &mut self.common_header[offset..offset + 8], + ); + } + + fn put_i32_to_common_header(&mut self, offset: usize, value: i32) { + copy_slice( + &value.to_le_bytes(), + &mut self.common_header[offset..offset + 4], + ); + } +} + /// The `Entry` struct represents a log entry in the system. /// /// # Fields @@ -18,33 +109,30 @@ use bytes::{Buf, BufMut, Bytes}; /// * `key` - A `Bytes` instance that represents the keys of the entry. /// * `value` - A `Bytes` instance that represents the values of the entry. pub struct EntryV1 { - pub attr: Attr, - pub log_id: i64, - pub entry_id: i64, - pub last_confirm: i64, + pub common_header: [u8; COMMON_HEADER_BINARY_SIZE], pub headers: Vec
, pub kv: Header, } impl Entry for EntryV1 { fn magic(&self) -> Magic { - Magic::V1 + Magic::try_from(self.common_header[COMMON_HEADER_MAGIC_OFFSET]).expect("invalid magic") } fn attr(&self) -> Attr { - self.attr + Attr::from(self.get_i32_from_common_header(COMMON_HEADER_ATTR_OFFSET)) } fn log_id(&self) -> i64 { - self.log_id + self.get_i64_from_common_header(COMMON_HEADER_LOG_ID_OFFSET) } fn entry_id(&self) -> i64 { - self.entry_id + self.get_i64_from_common_header(COMMON_HEADER_ENTRY_ID_OFFSET) } - fn last_confirm(&self) -> i64 { - self.last_confirm + fn last_confirm_id(&self) -> i64 { + self.get_i64_from_common_header(COMMON_HEADER_LAC_ID_OFFSET) } fn key(&self) -> &Bytes { @@ -60,7 +148,7 @@ impl Entry for EntryV1 { } fn binary_size(&self) -> usize { - let mut size = Self::common_header_binary_size(); + let mut size = COMMON_HEADER_BINARY_SIZE; for header in &self.headers { let header_size = header.binary_size(); size += prost::length_delimiter_len(header_size); @@ -73,16 +161,7 @@ impl Entry for EntryV1 { } fn encode(&self, mut buf: B) -> Result<()> { - // Write the magic to the buffer - buf.put_u8(Magic::V1.into()); - // Write the attr to the buffer - buf.put_i32(self.attr.into()); - // Write the log id to the buffer - buf.put_i64(self.log_id); - // Write the entry id to the buffer - buf.put_i64(self.entry_id); - // Write the last confirm to the buffer - buf.put_i64(self.last_confirm); + buf.put_slice(&self.common_header); for header in &self.headers { let size = header.binary_size(); prost::encode_length_delimiter(size, &mut buf)?; @@ -94,15 +173,11 @@ impl Entry for EntryV1 { Ok(()) } - fn decode_without_magic(mut buf: B) -> Result { - // Read the attr from the buffer - let attr = Attr::from(buf.get_i32()); - // Read the log id from the buffer - let log_id = buf.get_i64(); - // Read the entry id from the buffer - let entry_id = buf.get_i64(); - // Read the last confirm from the buffer - let last_confirm = buf.get_i64(); + fn decode_without_magic(magic: Magic, mut buf: B) -> Result { + let mut common_header = [0; COMMON_HEADER_BINARY_SIZE]; + common_header[0] = magic.into(); + buf.copy_to_slice(&mut common_header[1..]); + // Read the value from the buffer let mut headers = Vec::new(); while buf.has_remaining() { @@ -114,24 +189,23 @@ impl Entry for EntryV1 { } let kv: Header = headers.pop().expect("missing kv field in entry"); Ok(Self { - attr, - log_id, - entry_id, - last_confirm, + common_header, kv, headers, }) } - fn read_at(&self, buf: &mut [u8], offset: usize) -> usize { + fn read_at(&self, buf: &mut [u8], mut offset: usize) -> usize { let mut n = 0; - if offset < Self::common_header_binary_size() { - n += self.read_common_header_at_offset(buf, offset); + if offset < COMMON_HEADER_BINARY_SIZE { + let tmp_n = self.read_common_header_at_offset(buf, offset); + n += tmp_n; if n == buf.len() { return n; } + offset += tmp_n; } - let mut offset = offset - Self::common_header_binary_size(); + offset -= COMMON_HEADER_BINARY_SIZE; for header in &self.headers { (offset, n) = Self::read_at_header(header, offset, buf, n); if n == buf.len() { @@ -155,66 +229,40 @@ impl EntryV1 { if offset < size_of_header_size_delimiter { let mut tmp_storage = Vec::with_capacity(header_size); prost::encode_length_delimiter(header_size, &mut tmp_storage).unwrap(); - n += copy_slice(&tmp_storage[offset..], &mut buf[n..]); + let tmp_n = copy_slice(&tmp_storage[offset..], &mut buf[n..]); + n += tmp_n; if n == buf.len() { return (offset, n); } + offset += tmp_n; } offset -= size_of_header_size_delimiter; if offset < header_size { - n += header.read_at(buf, offset); + let tmp_n = header.read_at(&mut buf[n..], offset); + n += tmp_n; if n == buf.len() { return (offset, n); } + offset += tmp_n; } offset -= header_size; (offset, n) } - /// Get the size of the common header. - fn common_header_binary_size() -> usize { - // 1 + - // 4 + - // 8 + - // 8 + - // 8 = 29 - 29 + fn get_i64_from_common_header(&self, offset: usize) -> i64 { + let mut buf = [0; 8]; + copy_slice(&self.common_header[offset..offset + 8], &mut buf); + i64::from_le_bytes(buf) + } + + fn get_i32_from_common_header(&self, offset: usize) -> i32 { + let mut buf = [0; 4]; + copy_slice(&self.common_header[offset..offset + 4], &mut buf); + i32::from_le_bytes(buf) } fn read_common_header_at_offset(&self, buf: &mut [u8], offset: usize) -> usize { - let mut n = 0; - if offset < 1 && !buf.is_empty() { - buf[0] = Magic::V1.into(); - n += 1; - if n == buf.len() { - return n; - } - } - if offset < 5 { - let attr: i32 = self.attr.into(); - let src = attr.to_le_bytes(); - n += copy_slice(&src[offset - 1..], &mut buf[n..]); - if n == buf.len() { - return n; - } - } - if offset < 13 { - let src = self.log_id.to_le_bytes(); - n += copy_slice(&src[offset - 5..], &mut buf[n..]); - if n == buf.len() { - return n; - } - } - if offset < 21 { - let src = self.entry_id.to_le_bytes(); - n += copy_slice(&src[offset - 13..], &mut buf[n..]); - if n == buf.len() { - return n; - } - } - let src = self.last_confirm.to_le_bytes(); - n += copy_slice(&src[offset - 21..], &mut buf[n..]); - n + copy_slice(&self.common_header[offset..], buf) } } diff --git a/lib/storage/src/entry/mod.rs b/lib/storage/src/entry/mod.rs index 3e25085..cf5274b 100644 --- a/lib/storage/src/entry/mod.rs +++ b/lib/storage/src/entry/mod.rs @@ -1,23 +1,21 @@ -mod builder; mod error; mod header; mod impls_v1; mod util; -pub use builder::Builder; use bytes::{Buf, BufMut}; pub use error::Error; pub use header::Header; pub use util::{Attr, Magic}; -use self::impls_v1::EntryV1; +use self::impls_v1::{BuilderV1, EntryV1}; pub type Result = std::result::Result; /// decode an entry from a buffer. pub fn decode(mut buf: B) -> Result { let magic = Magic::try_from(buf.get_u8())?; match magic { - Magic::V1 => EntryV1::decode_without_magic(buf), + Magic::V1 => EntryV1::decode_without_magic(magic, buf), } } @@ -34,8 +32,8 @@ pub trait Entry { /// Returns the entry ID of the entry. fn entry_id(&self) -> i64; - /// Returns the last confirm of the entry. - fn last_confirm(&self) -> i64; + /// Returns the last confirm id of the entry. + fn last_confirm_id(&self) -> i64; /// Returns the key of the entry. fn key(&self) -> &bytes::Bytes; @@ -53,7 +51,7 @@ pub trait Entry { fn encode(&self, buf: B) -> Result<()>; /// Decodes the buffer into an entry. - fn decode_without_magic(buf: B) -> Result + fn decode_without_magic(magic: Magic, buf: B) -> Result where Self: Sized; @@ -72,20 +70,20 @@ mod tests { let value = Bytes::from_static(b"value"); let header = Header::new(key.clone(), value.clone()); - let entry = Builder::default() + let entry = BuilderV1::new() .log_id(1) .entry_id(2) .attr(Attr::default()) - .last_confirm(3) + .last_confirm_id(3) .kv(key.clone(), value.clone()) .header(header.clone()) - .build(Magic::V1); + .build(); assert_eq!(entry.magic(), Magic::V1); assert_eq!(entry.attr(), Attr::default()); assert_eq!(entry.log_id(), 1); assert_eq!(entry.entry_id(), 2); - assert_eq!(entry.last_confirm(), 3); + assert_eq!(entry.last_confirm_id(), 3); assert_eq!(entry.key(), &key); assert_eq!(entry.value(), &value); assert_eq!(entry.headers().len(), 1); @@ -94,10 +92,10 @@ mod tests { } #[test] - #[should_panic(expected = "called `Option::unwrap()` on a `None` value")] + #[should_panic(expected = "missing kv field in entry")] fn test_entry_builder_build_unwrap_none() { - let builder = Builder::default(); - builder.build(Magic::V1); // This should panic because we didn't set any values + let builder = BuilderV1::new(); + builder.build(); // This should panic because we didn't set any values } #[test] @@ -106,15 +104,15 @@ mod tests { let value = Bytes::from_static(b"value"); let header = Header::new(key.clone(), value.clone()); - let builder = Builder::default() + let builder = BuilderV1::new() .log_id(1) .entry_id(2) .attr(Attr::default()) - .last_confirm(3) + .last_confirm_id(3) .kv(key.clone(), value.clone()) .header(header.clone()); - let entry = builder.build(Magic::V1); + let entry = builder.build(); // Encode the entry into a buffer let mut buf = BytesMut::new(); @@ -128,7 +126,7 @@ mod tests { assert_eq!(decoded_entry.log_id(), entry.log_id()); assert_eq!(decoded_entry.entry_id(), entry.entry_id()); assert_eq!(decoded_entry.attr(), entry.attr()); - assert_eq!(decoded_entry.last_confirm(), entry.last_confirm()); + assert_eq!(decoded_entry.last_confirm_id(), entry.last_confirm_id()); assert_eq!(decoded_entry.key(), entry.key()); assert_eq!(decoded_entry.value(), entry.value()); assert_eq!(decoded_entry.headers().len(), entry.headers().len()); @@ -147,15 +145,15 @@ mod tests { let value = Bytes::from_static(b"value"); let header = Header::new(key.clone(), value.clone()); - let builder = Builder::default() + let builder = BuilderV1::new() .log_id(1) .entry_id(2) .attr(Attr::default()) - .last_confirm(3) + .last_confirm_id(3) .kv(key.clone(), value.clone()) .header(header.clone()); - let entry = builder.build(Magic::V1); + let entry = builder.build(); // Calculate the binary size let binary_size = entry.binary_size(); @@ -174,21 +172,27 @@ mod tests { #[test] fn test_read_at() { + for i in 1..16 { + read_all(i); + } + } + + fn read_all(step: usize) { let key = Bytes::from_static(b"key"); let value = Bytes::from_static(b"value"); let header = Header::new(key.clone(), value.clone()); - let entry = Builder::default() + let entry = BuilderV1::new() .log_id(1) .entry_id(2) .attr(Attr::default()) - .last_confirm(3) + .last_confirm_id(3) .kv(key.clone(), value.clone()) .header(header.clone()) .header(header) - .build(Magic::V1); + .build(); - let mut buf: [u8; 1] = [0; 1]; + let mut buf = vec![0; step]; let mut finial_decoded = Vec::new(); let mut n = 0; loop { @@ -197,7 +201,7 @@ mod tests { break; } n += k; - finial_decoded.put_slice(&buf[..]); + finial_decoded.extend_from_slice(&buf[..k]); } assert_eq!(finial_decoded.len(), entry.binary_size()); @@ -216,7 +220,7 @@ mod tests { assert_eq!(decoded_entry.log_id(), entry.log_id()); assert_eq!(decoded_entry.entry_id(), entry.entry_id()); assert_eq!(decoded_entry.attr(), entry.attr()); - assert_eq!(decoded_entry.last_confirm(), entry.last_confirm()); + assert_eq!(decoded_entry.last_confirm_id(), entry.last_confirm_id()); assert_eq!(decoded_entry.key(), entry.key()); assert_eq!(decoded_entry.value(), entry.value()); assert_eq!(decoded_entry.headers().len(), entry.headers().len());