diff --git a/src/cache/mod.rs b/src/cache/mod.rs index aadd0e6..f4ab0f1 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -14,7 +14,7 @@ use std::{ use crate::{ map::{Flags, Page, PageMut}, - store::{Data, Store}, + store::{Page as PageData, Store}, }; use super::map::PageMap; @@ -264,14 +264,14 @@ impl Store for NullStore { async fn set(&mut self, _index: u32, _block: &[u8]) -> Result<()> { Ok(()) } - async fn get(&self, _index: u32) -> Result>> { + async fn get(&self, _index: u32) -> Result>> { Ok(None) } fn size(&self) -> ByteSize { ByteSize::b(u64::MAX) } - fn block_size(&self) -> usize { + fn page_size(&self) -> usize { 0 } } diff --git a/src/lib.rs b/src/lib.rs index f48daef..92a00e0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,12 @@ pub mod device; pub mod map; pub mod store; +#[derive(thiserror::Error, Debug)] +pub enum PolicyError { + #[error("stores not same size")] + StoresNotSameSize, +} + #[derive(thiserror::Error, Debug)] pub enum Error { #[error("size cannot be zero")] @@ -50,6 +56,9 @@ pub enum Error { #[error("invalid meta data size")] InvalidMetaDataSize, + #[error("policy error: {0}")] + PolicyError(#[from] PolicyError), + #[error("io error: {0}")] IO(#[from] IoError), } diff --git a/src/main.rs b/src/main.rs index eb77e01..1fe623f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use clap::{ArgAction, Parser}; use nbd_async::Control; use qbd::{ device::DeviceControl, - store::{ConcatStore, FileStore, Store}, + store::{policy::Policy, FileStore, Store}, *, }; use std::{ @@ -106,7 +106,7 @@ async fn app(args: Args) -> anyhow::Result<()> { ); } - let store = ConcatStore::new(stores)?; + let store = Policy::strip(stores)?; let disk_size = store.size(); log::info!( diff --git a/src/store/file.rs b/src/store/file.rs index b21aba4..8627686 100644 --- a/src/store/file.rs +++ b/src/store/file.rs @@ -44,7 +44,7 @@ impl Store for FileStore { self.map.flush_page(index as usize) } - async fn get(&self, index: u32) -> Result>> { + async fn get(&self, index: u32) -> Result>> { // we access the map directly to avoid a borrow problem let header = self.map.header_at(index as usize); if !header.flag(Flags::Occupied) { @@ -53,14 +53,14 @@ impl Store for FileStore { let data = self.map.data_at(index as usize); - Ok(Some(Data::Borrowed(data))) + Ok(Some(Page::Borrowed(data))) } fn size(&self) -> ByteSize { self.size } - fn block_size(&self) -> usize { + fn page_size(&self) -> usize { self.map.page_size() } } diff --git a/src/store/mod.rs b/src/store/mod.rs index 8a16e85..8626525 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -3,6 +3,7 @@ use std::io::Error as IoError; use std::ops::Deref; mod file; +pub mod policy; mod sled_store; use crate::{Error, Result}; @@ -13,7 +14,7 @@ pub use sled_store::SledStore; /// Data is like built in Cow but read only /// this allow stores to return data with no copy /// if possible -pub enum Data<'a, T> +pub enum Page<'a, T> where T: Deref, { @@ -21,7 +22,7 @@ where Borrowed(&'a [u8]), } -impl<'a, T> Deref for Data<'a, T> +impl<'a, T> Deref for Page<'a, T> where T: Deref, { @@ -38,79 +39,17 @@ where pub trait Store: Send + Sync + 'static { type Vec: Deref; - async fn set(&mut self, index: u32, block: &[u8]) -> Result<()>; - async fn get(&self, index: u32) -> Result>>; - fn size(&self) -> ByteSize; - fn block_size(&self) -> usize; -} - -/// ConcatStore takes multiple stores and makes them -/// act like a single big store where size = sum(sizes) -pub struct ConcatStore { - parts: Vec, - bs: usize, -} - -impl ConcatStore -where - S: Store, -{ - pub fn new(parts: Vec) -> Result { - if parts.is_empty() { - return Err(Error::ZeroSize); - } - - let bs = parts[0].block_size(); - if !parts.iter().all(|f| f.block_size() == bs) { - return Err(Error::InvalidPageSize); - } - - Ok(Self { parts, bs }) - } -} - -#[async_trait::async_trait] -impl Store for ConcatStore -where - S: Store, -{ - type Vec = S::Vec; - - async fn set(&mut self, index: u32, block: &[u8]) -> Result<()> { - let mut index = index as usize; - for store in self.parts.iter_mut() { - let bc = store.size().0 as usize / self.bs; - if index < bc { - return store.set(index as u32, block).await; - } - - index -= bc; - } - - Err(Error::PageIndexOutOfRange) - } - - async fn get(&self, index: u32) -> Result>> { - let mut index = index as usize; - for store in self.parts.iter() { - let bc = store.size().0 as usize / self.bs; - if index < bc { - return store.get(index as u32).await; - } + /// set a page it the store + async fn set(&mut self, index: u32, page: &[u8]) -> Result<()>; - index -= bc; - } - - Err(Error::PageIndexOutOfRange) - } + /// get a page from the store + async fn get(&self, index: u32) -> Result>>; - fn size(&self) -> ByteSize { - self.parts.iter().fold(ByteSize(0), |t, i| t + i.size()) - } + /// size of the store + fn size(&self) -> ByteSize; - fn block_size(&self) -> usize { - self.bs - } + /// size of the page + fn page_size(&self) -> usize; } #[cfg(test)] @@ -138,52 +77,21 @@ mod test { #[async_trait::async_trait] impl Store for InMemory { type Vec = Vec; - async fn set(&mut self, index: u32, block: &[u8]) -> Result<()> { - self.mem.insert(index, Vec::from(block)); + async fn set(&mut self, index: u32, page: &[u8]) -> Result<()> { + self.mem.insert(index, Vec::from(page)); Ok(()) } - async fn get(&self, index: u32) -> Result>> { - Ok(self.mem.get(&index).map(|d| Data::Borrowed(&d))) + async fn get(&self, index: u32) -> Result>> { + Ok(self.mem.get(&index).map(|d| Page::Borrowed(&d))) } fn size(&self) -> ByteSize { - ByteSize((self.cap * self.block_size()) as u64) + ByteSize((self.cap * self.page_size()) as u64) } - fn block_size(&self) -> usize { + fn page_size(&self) -> usize { 1024 } } - - #[tokio::test] - async fn test_concat() { - let mut store = ConcatStore::new(vec![InMemory::new(10), InMemory::new(10)]).unwrap(); - assert_eq!(store.block_size(), 1024); - assert_eq!(store.size(), ByteSize(20 * 1024)); // 20 blocks each of 1024 bytes - - let b0 = store.get(0).await.unwrap(); - let b10 = store.get(10).await.unwrap(); - let b19 = store.get(19).await.unwrap(); - - assert!(store.get(20).await.is_err()); - - assert!(b0.is_none()); - assert!(b10.is_none()); - assert!(b19.is_none()); - - let data: [u8; 1024] = [70; 1024]; - assert!(store.set(10, &data).await.is_ok()); - - let b10 = store.get(10).await.unwrap(); - assert!(b10.is_some()); - let b10 = b10.unwrap(); - b10.iter().all(|f| *f == 70); - - let mem = &store.parts[1].mem; - assert_eq!(mem.len(), 1); - // because the concat store recalculate the offsets - // this then should be at index 0 - assert!(mem.get(&0).is_some()); - } } diff --git a/src/store/policy/concat.rs b/src/store/policy/concat.rs new file mode 100644 index 0000000..07422dd --- /dev/null +++ b/src/store/policy/concat.rs @@ -0,0 +1,110 @@ +use crate::store::{Page, Store}; +use crate::{Error, Result}; +use bytesize::ByteSize; + +/// ConcatStore takes multiple stores and makes them +/// act like a single big store where size = sum(sizes) +pub struct ConcatPolicy { + parts: Vec, + ps: usize, +} + +impl ConcatPolicy +where + S: Store, +{ + pub fn new(parts: Vec) -> Result { + if parts.is_empty() { + return Err(Error::ZeroSize); + } + + let ps = parts[0].page_size(); + if !parts.iter().all(|f| f.page_size() == ps) { + return Err(Error::InvalidPageSize); + } + + Ok(Self { parts, ps }) + } +} + +#[async_trait::async_trait] +impl Store for ConcatPolicy +where + S: Store, +{ + type Vec = S::Vec; + + async fn set(&mut self, index: u32, page: &[u8]) -> Result<()> { + let mut index = index as usize; + for store in self.parts.iter_mut() { + let bc = store.size().0 as usize / self.ps; + if index < bc { + return store.set(index as u32, page).await; + } + + index -= bc; + } + + Err(Error::PageIndexOutOfRange) + } + + async fn get(&self, index: u32) -> Result>> { + let mut index = index as usize; + for store in self.parts.iter() { + let bc = store.size().0 as usize / self.ps; + if index < bc { + return store.get(index as u32).await; + } + + index -= bc; + } + + Err(Error::PageIndexOutOfRange) + } + + fn size(&self) -> ByteSize { + self.parts.iter().fold(ByteSize(0), |t, i| t + i.size()) + } + + fn page_size(&self) -> usize { + self.ps + } +} + +#[cfg(test)] +mod test { + + use super::*; + use crate::store::InMemory; + + #[tokio::test] + async fn test_concat() { + let mut store = ConcatPolicy::new(vec![InMemory::new(10), InMemory::new(10)]).unwrap(); + assert_eq!(store.page_size(), 1024); + assert_eq!(store.size(), ByteSize(20 * 1024)); // 20 blocks each of 1024 bytes + + let b0 = store.get(0).await.unwrap(); + let b10 = store.get(10).await.unwrap(); + let b19 = store.get(19).await.unwrap(); + + assert!(store.get(20).await.is_err()); + + assert!(b0.is_none()); + assert!(b10.is_none()); + assert!(b19.is_none()); + + let data: [u8; 1024] = [70; 1024]; + assert!(store.set(10, &data).await.is_ok()); + + let b10 = store.get(10).await.unwrap(); + assert!(b10.is_some()); + let b10 = b10.unwrap(); + b10.iter().all(|f| *f == 70); + + let mem = &store.parts[1].mem; + assert_eq!(mem.len(), 1); + // because the concat store recalculate the offsets + // this then should be at index 0 + assert!(mem.get(&0).is_some()); + } +} diff --git a/src/store/policy/mod.rs b/src/store/policy/mod.rs new file mode 100644 index 0000000..bdd3352 --- /dev/null +++ b/src/store/policy/mod.rs @@ -0,0 +1,78 @@ +//! policy module implements a set of special stores types that does +//! not store the data on its own but uses other stores and applies some +//! policy on it. +//! +//! for example a ConcatStore appends 2 or more stores together so that +//! they appear as a bigger single store. +mod concat; +mod strip; + +use bytesize::ByteSize; +pub use concat::ConcatPolicy; +pub use strip::StripPolicy; + +use super::{Page, Store}; +use crate::Result; + +pub enum Policy +where + S: Store, +{ + Concat(ConcatPolicy), + Strip(StripPolicy), +} + +impl Policy +where + S: Store, +{ + /// build a new concat policy from parts + pub fn concat(parts: Vec) -> Result { + Ok(Self::Concat(ConcatPolicy::new(parts)?)) + } + + /// build a new strip policy from parts + pub fn strip(parts: Vec) -> Result { + Ok(Self::Strip(StripPolicy::new(parts)?)) + } +} + +#[async_trait::async_trait] +impl Store for Policy +where + S: Store, +{ + type Vec = S::Vec; + + /// set a page it the store + async fn set(&mut self, index: u32, page: &[u8]) -> Result<()> { + match self { + Self::Concat(inner) => inner.set(index, page).await, + Self::Strip(inner) => inner.set(index, page).await, + } + } + + /// get a page from the store + async fn get(&self, index: u32) -> Result>> { + match self { + Self::Concat(inner) => inner.get(index).await, + Self::Strip(inner) => inner.get(index).await, + } + } + + /// size of the store + fn size(&self) -> ByteSize { + match self { + Self::Concat(inner) => inner.size(), + Self::Strip(inner) => inner.size(), + } + } + + /// size of the page + fn page_size(&self) -> usize { + match self { + Self::Concat(inner) => inner.page_size(), + Self::Strip(inner) => inner.page_size(), + } + } +} diff --git a/src/store/policy/strip.rs b/src/store/policy/strip.rs new file mode 100644 index 0000000..aacc454 --- /dev/null +++ b/src/store/policy/strip.rs @@ -0,0 +1,83 @@ +use crate::store::{Page, Store}; +use crate::{Error, PolicyError, Result}; +use bytesize::ByteSize; + +/// StripPolicy takes multiple stores and makes them +/// act like a single big store where size = sum(sizes) +/// the difference between concat store is that here +/// the blocks is stripped over the multiple stores like +/// raid0 +/// +/// WARNING: when using stripping it's not possible to later +/// add another store to the array otherwise all offsets and +/// locations will be wrong. +pub struct StripPolicy { + parts: Vec, + bs: usize, + size: ByteSize, +} + +impl StripPolicy +where + S: Store, +{ + pub fn new(parts: Vec) -> Result { + if parts.is_empty() { + return Err(Error::ZeroSize); + } + let size = parts[0].size(); + if !parts.iter().all(|f| f.size() == size) { + return Err(PolicyError::StoresNotSameSize.into()); + } + + let bs = parts[0].page_size(); + if !parts.iter().all(|f| f.page_size() == bs) { + return Err(Error::InvalidPageSize); + } + + let total_size = size.0 * parts.len() as u64; + Ok(Self { + parts, + bs, + size: ByteSize(total_size), + }) + } +} + +#[async_trait::async_trait] +impl Store for StripPolicy +where + S: Store, +{ + type Vec = S::Vec; + + async fn set(&mut self, index: u32, page: &[u8]) -> Result<()> { + if index as u64 >= self.size.0 { + return Err(Error::PageIndexOutOfRange); + } + + let outer = index as usize % self.parts.len(); + let inner = index as usize / self.parts.len(); + + self.parts[outer].set(inner as u32, page).await + } + + async fn get(&self, index: u32) -> Result>> { + if index as u64 >= self.size.0 { + return Err(Error::PageIndexOutOfRange); + } + + let outer = index as usize % self.parts.len(); + let inner = index as usize / self.parts.len(); + + self.parts[outer].get(inner as u32).await + } + + fn size(&self) -> ByteSize { + self.size + } + + fn page_size(&self) -> usize { + self.bs + } +} diff --git a/src/store/sled_store.rs b/src/store/sled_store.rs index a796185..d47e546 100644 --- a/src/store/sled_store.rs +++ b/src/store/sled_store.rs @@ -44,11 +44,11 @@ impl Store for SledStore { Ok(()) } - async fn get(&self, index: u32) -> Result>> { + async fn get(&self, index: u32) -> Result>> { if index >= self.bc { return Err(Error::PageIndexOutOfRange); } - let data = self.db.get(index.to_be_bytes())?.map(Data::Owned); + let data = self.db.get(index.to_be_bytes())?.map(Page::Owned); Ok(data) } @@ -57,7 +57,7 @@ impl Store for SledStore { self.size } - fn block_size(&self) -> usize { + fn page_size(&self) -> usize { self.bs.0 as usize } }