Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] use macro implement read operation of entry and header #3

Merged
merged 1 commit into from
Jun 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/storage/src/entry/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ pub enum Error {

#[error("prost decode")]
ProstDecode(#[from] prost::DecodeError),

#[error("kv not found")]
KVNotFound,
}
55 changes: 26 additions & 29 deletions lib/storage/src/entry/header.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// Importing necessary modules and types
use super::Result;
use crate::util::copy_slice;
use bytes::{Buf, BufMut, Bytes};

use crate::util::copy_slice;

use super::Result;
use super::{util::copy_slice_with_multi_stage, util::customize_copy_slice_with_multi_stage};

// Defining a struct Header with key and value as Bytes
// It use length delimited encoding
#[derive(Clone)]
Expand All @@ -13,57 +15,52 @@ pub struct Header {

// Implementing methods for Header struct
impl Header {
// Constructor for Header
/// Constructor for Header
pub fn new(key: Bytes, value: Bytes) -> Self {
Self { key, value }
}

// Getter for key
/// Getter for key
pub fn key(&self) -> &Bytes {
&self.key
}

// Getter for value
/// Getter for value
pub fn value(&self) -> &Bytes {
&self.value
}

// read at a specific offset of Header's binary representation.
pub fn read_at(&self, buf: &mut [u8], offset: usize) -> usize {
/// read at a specific offset of Header's binary representation.
pub fn read_at(&self, buf: &mut [u8], mut offset: usize) -> usize {
let key_len = self.key.len();
let key_len_size = prost::length_delimiter_len(key_len);
let mut n = 0;
if offset < key_len_size {
let key_len_delimiter_getter = || {
let mut tmp_storage = Vec::with_capacity(key_len_size);
// There's enough capacity, so should never fail.
prost::encode_length_delimiter(key_len, &mut tmp_storage).unwrap();
n = copy_slice(&tmp_storage[offset..], &mut buf[..]);
if n == buf.len() {
return n;
}
}
if offset + n < key_len_size + key_len {
n += copy_slice(&self.key[offset + n - key_len_size..], &mut buf[n..]);
if n == buf.len() {
return n;
}
}
if offset + n < key_len_size + key_len + self.value.len() {
n += copy_slice(
&self.value[offset + n - key_len_size - key_len..],
&mut buf[n..],
);
}
tmp_storage
};

customize_copy_slice_with_multi_stage!(
copy_slice(&key_len_delimiter_getter(), &mut buf[n..]),
key_len_size,
buf,
offset,
n
);
copy_slice_with_multi_stage!(self.key, buf, offset, n);
copy_slice_with_multi_stage!(self.value, buf, offset, n);
n
}

// Method to get the binary size of the Header
/// Method to get the binary size of the Header
pub fn binary_size(&self) -> usize {
let key_len = self.key.len();
prost::length_delimiter_len(key_len) + self.key.len() + self.value.len()
}

// Method to encode the Header into a buffer
/// Method to encode the Header into a buffer
pub fn encode<B: BufMut>(&self, buf: &mut B) -> Result<()> {
// Get the length of the key
let length = self.key.len();
Expand All @@ -76,7 +73,7 @@ impl Header {
Ok(())
}

// Method to decode the Header from a buffer
/// Method to decode the Header from a buffer
pub fn decode<B: Buf>(mut buf: B) -> Result<Self> {
// Decode the length of the key from the buffer
let key_len = prost::decode_length_delimiter(&mut buf)?;
Expand Down
101 changes: 36 additions & 65 deletions lib/storage/src/entry/impls_v1.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use bytes::{Buf, BufMut, Bytes};

use crate::util::copy_slice;

use super::Attr;
use super::Entry;
use super::Header;
use super::Magic;
use super::Result;
use bytes::{Buf, BufMut, Bytes};
use super::{util::copy_slice_with_multi_stage, util::customize_copy_slice_with_multi_stage};

// Magic 1
// Attr 4
Expand Down Expand Up @@ -75,11 +77,12 @@ impl BuilderV1 {
}

/// Method to build the Entry
pub fn build(self) -> impl Entry {
pub fn build(mut self) -> impl Entry {
self.headers
.push(self.kv.expect("missing kv field in entry"));
EntryV1 {
common_header: self.common_header,
headers: self.headers,
kv: self.kv.expect("missing kv field in entry"),
}
}

Expand Down Expand Up @@ -111,7 +114,6 @@ impl BuilderV1 {
pub struct EntryV1 {
pub common_header: [u8; COMMON_HEADER_BINARY_SIZE],
pub headers: Vec<Header>,
pub kv: Header,
}

impl Entry for EntryV1 {
Expand All @@ -136,15 +138,15 @@ impl Entry for EntryV1 {
}

fn key(&self) -> &Bytes {
self.kv.key()
self.headers.last().unwrap().key()
}

fn value(&self) -> &Bytes {
self.kv.value()
self.headers.last().unwrap().value()
}

fn headers(&self) -> &[Header] {
&self.headers
&self.headers[..self.headers.len() - 1]
}

fn binary_size(&self) -> usize {
Expand All @@ -154,9 +156,6 @@ impl Entry for EntryV1 {
size += prost::length_delimiter_len(header_size);
size += header_size;
}
let kv_size = self.kv.binary_size();
size += prost::length_delimiter_len(kv_size);
size += kv_size;
size
}

Expand All @@ -167,9 +166,6 @@ impl Entry for EntryV1 {
prost::encode_length_delimiter(size, &mut buf)?;
header.encode(&mut buf)?;
}
let size = self.kv.binary_size();
prost::encode_length_delimiter(size, &mut buf)?;
self.kv.encode(&mut buf)?;
Ok(())
}

Expand All @@ -187,69 +183,48 @@ impl Entry for EntryV1 {
headers.push(Header::decode(&mut header_buf)?);
buf = header_buf.into_inner();
}
let kv: Header = headers.pop().expect("missing kv field in entry");
if headers.is_empty() {
return Err(super::Error::KVNotFound);
}
Ok(Self {
common_header,
kv,
headers,
})
}

fn read_at(&self, buf: &mut [u8], mut offset: usize) -> usize {
let mut n = 0;
if offset < COMMON_HEADER_BINARY_SIZE {
let tmp_n = self.read_common_header_at_offset(buf, offset);
n += tmp_n;
if n == buf.len() {
return n;
}
offset += tmp_n;
}
offset -= COMMON_HEADER_BINARY_SIZE;

copy_slice_with_multi_stage!(self.common_header, buf, offset, n);

for header in &self.headers {
(offset, n) = Self::read_at_header(header, offset, buf, n);
if n == buf.len() {
return n;
}
let header_size = header.binary_size();
let header_size_delimiter_size = prost::length_delimiter_len(header_size);
let header_size_delimiter_getter = || -> Vec<u8> {
let mut tmp_storage = Vec::with_capacity(header_size_delimiter_size);
prost::encode_length_delimiter(header_size, &mut tmp_storage).unwrap();
tmp_storage
};
customize_copy_slice_with_multi_stage!(
copy_slice(&header_size_delimiter_getter(), &mut buf[n..]),
header_size_delimiter_size,
buf,
offset,
n
);
customize_copy_slice_with_multi_stage!(
header.read_at(&mut buf[n..], offset),
header_size,
buf,
offset,
n
);
}
(_, n) = Self::read_at_header(&self.kv, offset, buf, n);
n
}
}

impl EntryV1 {
fn read_at_header(
header: &Header,
mut offset: usize,
buf: &mut [u8],
mut n: usize,
) -> (usize, usize) {
let header_size = header.binary_size();
let size_of_header_size_delimiter = prost::length_delimiter_len(header_size);
if offset < size_of_header_size_delimiter {
let mut tmp_storage = Vec::with_capacity(header_size);
prost::encode_length_delimiter(header_size, &mut tmp_storage).unwrap();
let tmp_n = copy_slice(&tmp_storage[offset..], &mut buf[n..]);
n += tmp_n;
if n == buf.len() {
return (offset, n);
}
offset += tmp_n;
}
offset -= size_of_header_size_delimiter;

if offset < header_size {
let tmp_n = header.read_at(&mut buf[n..], offset);
n += tmp_n;
if n == buf.len() {
return (offset, n);
}
offset += tmp_n;
}
offset -= header_size;
(offset, n)
}

fn get_i64_from_common_header(&self, offset: usize) -> i64 {
let mut buf = [0; 8];
copy_slice(&self.common_header[offset..offset + 8], &mut buf);
Expand All @@ -261,8 +236,4 @@ impl EntryV1 {
copy_slice(&self.common_header[offset..offset + 4], &mut buf);
i32::from_le_bytes(buf)
}

fn read_common_header_at_offset(&self, buf: &mut [u8], offset: usize) -> usize {
copy_slice(&self.common_header[offset..], buf)
}
}
40 changes: 40 additions & 0 deletions lib/storage/src/entry/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,43 @@ impl From<Magic> for u8 {
}
}
}

macro_rules! copy_slice_with_multi_stage {
($src:expr, $dst:expr, $stage_offset:expr, $dst_offset:expr) => {
if $dst_offset == $dst.len() {
return $dst_offset;
} else if $stage_offset < $src.len() {
let tmp_n = copy_slice(&$src[$stage_offset..], &mut $dst[$dst_offset..]);
$dst_offset += tmp_n;
if $dst_offset == $dst.len() {
return $dst_offset;
} else {
$stage_offset = $stage_offset + tmp_n - $src.len();
}
} else {
$stage_offset -= $src.len();
}
};
() => {};
}
pub(super) use copy_slice_with_multi_stage;

macro_rules! customize_copy_slice_with_multi_stage {
($custom_copy:expr, $src_len:expr, $dst:expr, $stage_offset:expr, $dst_offset:expr) => {
if $dst_offset == $dst.len() {
return $dst_offset;
} else if $stage_offset < $src_len {
let tmp_n = $custom_copy;
$dst_offset += tmp_n;
if $dst_offset == $dst.len() {
return $dst_offset;
} else {
$stage_offset = $stage_offset + tmp_n - $src_len;
}
} else {
$stage_offset -= $src_len;
}
};
() => {};
}
pub(super) use customize_copy_slice_with_multi_stage;