Skip to content

Commit

Permalink
Separation of store policy and store backend
Browse files Browse the repository at this point in the history
A policy is just a wrapper on top of set of stores
that is responsible of distributing the data pages
across the underlying stores.

Currently only 2 policies are available
- concat
- strip
  • Loading branch information
muhamadazmy committed Nov 17, 2023
1 parent ada6fb5 commit 651c36d
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 120 deletions.
6 changes: 3 additions & 3 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{

use crate::{
map::{Flags, Page, PageMut},
store::{Data, Store},
store::{Page as PageData, Store},
};

use super::map::PageMap;
Expand Down Expand Up @@ -264,14 +264,14 @@ impl Store for NullStore {
async fn set(&mut self, _index: u32, _block: &[u8]) -> Result<()> {
Ok(())
}
async fn get(&self, _index: u32) -> Result<Option<Data<Self::Vec>>> {
async fn get(&self, _index: u32) -> Result<Option<PageData<Self::Vec>>> {
Ok(None)
}
fn size(&self) -> ByteSize {
ByteSize::b(u64::MAX)
}

fn block_size(&self) -> usize {
fn page_size(&self) -> usize {
0
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ pub mod device;
pub mod map;
pub mod store;

#[derive(thiserror::Error, Debug)]
pub enum PolicyError {
#[error("stores not same size")]
StoresNotSameSize,
}

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("size cannot be zero")]
Expand Down Expand Up @@ -50,6 +56,9 @@ pub enum Error {
#[error("invalid meta data size")]
InvalidMetaDataSize,

#[error("policy error: {0}")]
PolicyError(#[from] PolicyError),

#[error("io error: {0}")]
IO(#[from] IoError),
}
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use clap::{ArgAction, Parser};
use nbd_async::Control;
use qbd::{
device::DeviceControl,
store::{ConcatStore, FileStore, Store},
store::{policy::Policy, FileStore, Store},
*,
};
use std::{
Expand Down Expand Up @@ -106,7 +106,7 @@ async fn app(args: Args) -> anyhow::Result<()> {
);
}

let store = ConcatStore::new(stores)?;
let store = Policy::strip(stores)?;

let disk_size = store.size();
log::info!(
Expand Down
6 changes: 3 additions & 3 deletions src/store/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Store for FileStore {
self.map.flush_page(index as usize)
}

async fn get(&self, index: u32) -> Result<Option<Data<Self::Vec>>> {
async fn get(&self, index: u32) -> Result<Option<Page<Self::Vec>>> {
// we access the map directly to avoid a borrow problem
let header = self.map.header_at(index as usize);
if !header.flag(Flags::Occupied) {
Expand All @@ -53,14 +53,14 @@ impl Store for FileStore {

let data = self.map.data_at(index as usize);

Ok(Some(Data::Borrowed(data)))
Ok(Some(Page::Borrowed(data)))
}

fn size(&self) -> ByteSize {
self.size
}

fn block_size(&self) -> usize {
fn page_size(&self) -> usize {
self.map.page_size()
}
}
126 changes: 17 additions & 109 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::io::Error as IoError;
use std::ops::Deref;

mod file;
pub mod policy;
mod sled_store;

use crate::{Error, Result};
Expand All @@ -13,15 +14,15 @@ pub use sled_store::SledStore;
/// Data is like built in Cow but read only
/// this allow stores to return data with no copy
/// if possible
pub enum Data<'a, T>
pub enum Page<'a, T>
where
T: Deref<Target = [u8]>,
{
Owned(T),
Borrowed(&'a [u8]),
}

impl<'a, T> Deref for Data<'a, T>
impl<'a, T> Deref for Page<'a, T>
where
T: Deref<Target = [u8]>,
{
Expand All @@ -38,79 +39,17 @@ where
pub trait Store: Send + Sync + 'static {
type Vec: Deref<Target = [u8]>;

async fn set(&mut self, index: u32, block: &[u8]) -> Result<()>;
async fn get(&self, index: u32) -> Result<Option<Data<Self::Vec>>>;
fn size(&self) -> ByteSize;
fn block_size(&self) -> usize;
}

/// ConcatStore takes multiple stores and makes them
/// act like a single big store where size = sum(sizes)
pub struct ConcatStore<S> {
parts: Vec<S>,
bs: usize,
}

impl<S> ConcatStore<S>
where
S: Store,
{
pub fn new(parts: Vec<S>) -> Result<Self> {
if parts.is_empty() {
return Err(Error::ZeroSize);
}

let bs = parts[0].block_size();
if !parts.iter().all(|f| f.block_size() == bs) {
return Err(Error::InvalidPageSize);
}

Ok(Self { parts, bs })
}
}

#[async_trait::async_trait]
impl<S> Store for ConcatStore<S>
where
S: Store,
{
type Vec = S::Vec;

async fn set(&mut self, index: u32, block: &[u8]) -> Result<()> {
let mut index = index as usize;
for store in self.parts.iter_mut() {
let bc = store.size().0 as usize / self.bs;
if index < bc {
return store.set(index as u32, block).await;
}

index -= bc;
}

Err(Error::PageIndexOutOfRange)
}

async fn get(&self, index: u32) -> Result<Option<Data<Self::Vec>>> {
let mut index = index as usize;
for store in self.parts.iter() {
let bc = store.size().0 as usize / self.bs;
if index < bc {
return store.get(index as u32).await;
}
/// set a page it the store
async fn set(&mut self, index: u32, page: &[u8]) -> Result<()>;

index -= bc;
}

Err(Error::PageIndexOutOfRange)
}
/// get a page from the store
async fn get(&self, index: u32) -> Result<Option<Page<Self::Vec>>>;

fn size(&self) -> ByteSize {
self.parts.iter().fold(ByteSize(0), |t, i| t + i.size())
}
/// size of the store
fn size(&self) -> ByteSize;

fn block_size(&self) -> usize {
self.bs
}
/// size of the page
fn page_size(&self) -> usize;
}

#[cfg(test)]
Expand Down Expand Up @@ -138,52 +77,21 @@ mod test {
#[async_trait::async_trait]
impl Store for InMemory {
type Vec = Vec<u8>;
async fn set(&mut self, index: u32, block: &[u8]) -> Result<()> {
self.mem.insert(index, Vec::from(block));
async fn set(&mut self, index: u32, page: &[u8]) -> Result<()> {
self.mem.insert(index, Vec::from(page));
Ok(())
}

async fn get(&self, index: u32) -> Result<Option<Data<Self::Vec>>> {
Ok(self.mem.get(&index).map(|d| Data::Borrowed(&d)))
async fn get(&self, index: u32) -> Result<Option<Page<Self::Vec>>> {
Ok(self.mem.get(&index).map(|d| Page::Borrowed(&d)))
}

fn size(&self) -> ByteSize {
ByteSize((self.cap * self.block_size()) as u64)
ByteSize((self.cap * self.page_size()) as u64)
}

fn block_size(&self) -> usize {
fn page_size(&self) -> usize {
1024
}
}

#[tokio::test]
async fn test_concat() {
let mut store = ConcatStore::new(vec![InMemory::new(10), InMemory::new(10)]).unwrap();
assert_eq!(store.block_size(), 1024);
assert_eq!(store.size(), ByteSize(20 * 1024)); // 20 blocks each of 1024 bytes

let b0 = store.get(0).await.unwrap();
let b10 = store.get(10).await.unwrap();
let b19 = store.get(19).await.unwrap();

assert!(store.get(20).await.is_err());

assert!(b0.is_none());
assert!(b10.is_none());
assert!(b19.is_none());

let data: [u8; 1024] = [70; 1024];
assert!(store.set(10, &data).await.is_ok());

let b10 = store.get(10).await.unwrap();
assert!(b10.is_some());
let b10 = b10.unwrap();
b10.iter().all(|f| *f == 70);

let mem = &store.parts[1].mem;
assert_eq!(mem.len(), 1);
// because the concat store recalculate the offsets
// this then should be at index 0
assert!(mem.get(&0).is_some());
}
}
110 changes: 110 additions & 0 deletions src/store/policy/concat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use crate::store::{Page, Store};
use crate::{Error, Result};
use bytesize::ByteSize;

/// ConcatStore takes multiple stores and makes them
/// act like a single big store where size = sum(sizes)
pub struct ConcatPolicy<S> {
parts: Vec<S>,
ps: usize,
}

impl<S> ConcatPolicy<S>
where
S: Store,
{
pub fn new(parts: Vec<S>) -> Result<Self> {
if parts.is_empty() {
return Err(Error::ZeroSize);
}

let ps = parts[0].page_size();
if !parts.iter().all(|f| f.page_size() == ps) {
return Err(Error::InvalidPageSize);
}

Ok(Self { parts, ps })
}
}

#[async_trait::async_trait]
impl<S> Store for ConcatPolicy<S>
where
S: Store,
{
type Vec = S::Vec;

async fn set(&mut self, index: u32, page: &[u8]) -> Result<()> {
let mut index = index as usize;
for store in self.parts.iter_mut() {
let bc = store.size().0 as usize / self.ps;
if index < bc {
return store.set(index as u32, page).await;
}

index -= bc;
}

Err(Error::PageIndexOutOfRange)
}

async fn get(&self, index: u32) -> Result<Option<Page<Self::Vec>>> {
let mut index = index as usize;
for store in self.parts.iter() {
let bc = store.size().0 as usize / self.ps;
if index < bc {
return store.get(index as u32).await;
}

index -= bc;
}

Err(Error::PageIndexOutOfRange)
}

fn size(&self) -> ByteSize {
self.parts.iter().fold(ByteSize(0), |t, i| t + i.size())
}

fn page_size(&self) -> usize {
self.ps
}
}

#[cfg(test)]
mod test {

use super::*;
use crate::store::InMemory;

#[tokio::test]
async fn test_concat() {
let mut store = ConcatPolicy::new(vec![InMemory::new(10), InMemory::new(10)]).unwrap();
assert_eq!(store.page_size(), 1024);
assert_eq!(store.size(), ByteSize(20 * 1024)); // 20 blocks each of 1024 bytes

let b0 = store.get(0).await.unwrap();
let b10 = store.get(10).await.unwrap();
let b19 = store.get(19).await.unwrap();

assert!(store.get(20).await.is_err());

assert!(b0.is_none());
assert!(b10.is_none());
assert!(b19.is_none());

let data: [u8; 1024] = [70; 1024];
assert!(store.set(10, &data).await.is_ok());

let b10 = store.get(10).await.unwrap();
assert!(b10.is_some());
let b10 = b10.unwrap();
b10.iter().all(|f| *f == 70);

let mem = &store.parts[1].mem;
assert_eq!(mem.len(), 1);
// because the concat store recalculate the offsets
// this then should be at index 0
assert!(mem.get(&0).is_some());
}
}
Loading

0 comments on commit 651c36d

Please sign in to comment.