Skip to content

Commit

Permalink
dynamic creation of stores based on url
Browse files Browse the repository at this point in the history
  • Loading branch information
muhamadazmy committed Sep 21, 2023
1 parent 7b973e2 commit 8617e5e
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 73 deletions.
2 changes: 1 addition & 1 deletion src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::fs::{self, File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};

/// Cache implements a caching layer on top of a block store
#[derive(Clone)]
//#[derive(Clone)]
pub struct Cache<S: Store> {
store: BlockStore<S>,
root: PathBuf,
Expand Down
22 changes: 17 additions & 5 deletions src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,36 @@ const LRU_CAP: usize = 5; // Least Recently Used File Capacity
type FHash = [u8; 32];
type BlockSize = u64;

#[derive(Clone)]
pub struct Filesystem<S>
where
S: Store + Clone,
S: Store,
{
meta: Reader,
cache: cache::Cache<S>,
cache: Arc<cache::Cache<S>>,
lru: Arc<Mutex<lru::LruCache<FHash, (File, BlockSize)>>>,
}

impl<S> Clone for Filesystem<S>
where
S: Store,
{
fn clone(&self) -> Self {
Self {
meta: self.meta.clone(),
cache: Arc::clone(&self.cache),
lru: Arc::clone(&self.lru),
}
}
}

impl<S> Filesystem<S>
where
S: Store + Clone,
S: Store,
{
pub fn new(meta: Reader, cache: cache::Cache<S>) -> Self {
Filesystem {
meta,
cache,
cache: Arc::new(cache),
lru: Arc::new(Mutex::new(lru::LruCache::new(LRU_CAP))),
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use clap::{ArgAction, Parser};

use rfs::cache;
use rfs::fungi;
use rfs::store::{self, StoreFactory};
use rfs::store;

mod fs;
/// mount flists
Expand Down Expand Up @@ -142,7 +142,8 @@ async fn app(opts: Options) -> Result<()> {
.await
.context("failed to initialize metadata database")?;

let store = store::zdb::ZdbStoreFactory.build(&opts.storage_url).await?;
//let store = store::Router::new();
let store = store::zdb::make(&opts.storage_url).await?;

let cache = cache::Cache::new(opts.cache, store);

Expand Down
18 changes: 18 additions & 0 deletions src/store/dir.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
use super::{Error, Result, Route, Store};
use futures::Future;
use std::io::ErrorKind;
use std::os::unix::prelude::OsStrExt;
use std::path::PathBuf;
use std::pin::Pin;
use tokio::fs;
use url;

const SCHEME: &str = "dir";

async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let u = url::Url::parse(&url)?;
if u.scheme() != SCHEME {
return Err(Error::InvalidScheme(u.scheme().into(), SCHEME.into()));
}

Ok(Box::new(DirStore::new(u.path()).await?))
}

pub fn make(url: &str) -> Pin<Box<dyn Future<Output = Result<Box<dyn Store>>>>> {
Box::pin(make_inner(url.into()))
}

/// DirStore is a simple store that store blobs on the filesystem
/// and is mainly used for testing
Expand Down
41 changes: 34 additions & 7 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,24 @@ pub mod dir;
mod router;
pub mod zdb;

use std::{collections::HashMap, pin::Pin};

pub use bs::BlockStore;
use futures::Future;

lazy_static::lazy_static! {
pub static ref STORES: HashMap<String, Factory> = register_stores();
}

/// register_stores is used to register the stores built in types
/// so they can be created with a url
fn register_stores() -> HashMap<String, Factory> {
let mut m: HashMap<String, Factory> = HashMap::default();
m.insert("dir".into(), dir::make);
m.insert("zdb".into(), zdb::make);

m
}

#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -33,6 +50,8 @@ pub enum Error {

#[error("url parse error: {0}")]
Url(#[from] url::ParseError),
#[error("invalid schema '{0}' expected '{1}'")]
InvalidScheme(String, String),
#[error("other: {0}")]
Other(#[from] anyhow::Error),
}
Expand Down Expand Up @@ -64,15 +83,10 @@ pub trait Store: Send + Sync + 'static {
fn routes(&self) -> Vec<Route>;
}

/// The store factory trait works as a factory for a specific store
/// The store factory works as a factory for a specific store
/// this is only needed to be able dynamically create different types
/// of stores based only on scheme of the store url.
#[async_trait::async_trait]
pub trait StoreFactory {
type Store: Store;

async fn build<U: AsRef<str> + Send>(&self, url: U) -> anyhow::Result<Self::Store>;
}
pub type Factory = fn(u: &str) -> Pin<Box<dyn Future<Output = Result<Box<dyn Store>>>>>;

/// Router holds a set of shards (stores) where each store can be configured to serve
/// a range of hashes.
Expand Down Expand Up @@ -140,3 +154,16 @@ impl Store for Router {
routes
}
}

#[async_trait::async_trait]
impl Store for Box<dyn Store> {
async fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
self.as_ref().get(key).await
}
async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()> {
self.as_ref().set(key, blob).await
}
fn routes(&self) -> Vec<Route> {
self.as_ref().routes()
}
}
111 changes: 53 additions & 58 deletions src/store/zdb.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::{Error, Result, Route, Store, StoreFactory};
use std::pin::Pin;

use super::{Error, Result, Route, Store};
use anyhow::Context;

use bb8_redis::{
Expand All @@ -9,6 +11,7 @@ use bb8_redis::{
},
RedisConnectionManager,
};
use futures::Future;

#[derive(Debug)]
struct WithNamespace {
Expand Down Expand Up @@ -40,63 +43,61 @@ impl CustomizeConnection<Connection, RedisError> for WithNamespace {

pub struct ZdbStoreFactory;

impl ZdbStoreFactory {
fn get_connection_info<U: AsRef<str>>(&self, u: U) -> Result<(ConnectionInfo, Option<String>)> {
let u = url::Url::parse(u.as_ref())?;

let (address, namespace) = match u.host() {
Some(host) => {
let addr = ConnectionAddr::Tcp(host.to_string(), u.port().unwrap_or(9900));
let ns: Option<String> = u
.path_segments()
.and_then(|s| s.last().map(|s| s.to_owned()));
(addr, ns)
}
None => (ConnectionAddr::Unix(u.path().into()), None),
};

Ok((
ConnectionInfo {
addr: address,
redis: RedisConnectionInfo {
db: 0,
username: if u.username().is_empty() {
None
} else {
Some(u.username().into())
},
password: u.password().map(|s| s.into()),
fn get_connection_info<U: AsRef<str>>(u: U) -> Result<(ConnectionInfo, Option<String>)> {
let u = url::Url::parse(u.as_ref())?;

let (address, namespace) = match u.host() {
Some(host) => {
let addr = ConnectionAddr::Tcp(host.to_string(), u.port().unwrap_or(9900));
let ns: Option<String> = u
.path_segments()
.and_then(|s| s.last().map(|s| s.to_owned()));
(addr, ns)
}
None => (ConnectionAddr::Unix(u.path().into()), None),
};

Ok((
ConnectionInfo {
addr: address,
redis: RedisConnectionInfo {
db: 0,
username: if u.username().is_empty() {
None
} else {
Some(u.username().into())
},
password: u.password().map(|s| s.into()),
},
namespace,
))
}
},
namespace,
))
}

#[async_trait::async_trait]
impl StoreFactory for ZdbStoreFactory {
type Store = ZdbStore;
async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let (mut info, namespace) = get_connection_info(&url)?;

async fn build<U: AsRef<str> + Send>(&self, u: U) -> anyhow::Result<Self::Store> {
let url = u.as_ref().to_owned();
let (mut info, namespace) = self.get_connection_info(u)?;
let namespace = WithNamespace {
namespace,
password: info.redis.password.take(),
};

let namespace = WithNamespace {
namespace,
password: info.redis.password.take(),
};
log::debug!("switching namespace to: {:?}", namespace.namespace);
let mgr =
RedisConnectionManager::new(info).context("failed to create redis connection manager")?;

log::debug!("switching namespace to: {:?}", namespace.namespace);
let mgr = RedisConnectionManager::new(info)?;
let pool = Pool::builder()
.max_size(20)
.connection_customizer(Box::new(namespace))
.build(mgr)
.await
.context("failed to create connection pool")?;

let pool = Pool::builder()
.max_size(20)
.connection_customizer(Box::new(namespace))
.build(mgr)
.await?;
Ok(Box::from(ZdbStore { url, pool }))
}

Ok(ZdbStore { url, pool })
}
pub fn make(url: &str) -> Pin<Box<dyn Future<Output = Result<Box<dyn Store>>>>> {
Box::pin(make_inner(url.into()))
}

#[derive(Clone)]
Expand Down Expand Up @@ -141,28 +142,22 @@ mod test {

#[test]
fn test_connection_info_simple() {
let (info, ns) = ZdbStoreFactory
.get_connection_info("zdb://hub.grid.tf:9900")
.unwrap();
let (info, ns) = get_connection_info("zdb://hub.grid.tf:9900").unwrap();
assert_eq!(ns, None);
assert_eq!(info.addr, ConnectionAddr::Tcp("hub.grid.tf".into(), 9900));
}

#[test]
fn test_connection_info_ns() {
let (info, ns) = ZdbStoreFactory
.get_connection_info("zdb://[email protected]/custom")
.unwrap();
let (info, ns) = get_connection_info("zdb://[email protected]/custom").unwrap();
assert_eq!(ns, Some("custom".into()));
assert_eq!(info.addr, ConnectionAddr::Tcp("hub.grid.tf".into(), 9900));
assert_eq!(info.redis.username, Some("username".into()));
}

#[test]
fn test_connection_info_unix() {
let (info, ns) = ZdbStoreFactory
.get_connection_info("zdb:///path/to/socket")
.unwrap();
let (info, ns) = get_connection_info("zdb:///path/to/socket").unwrap();
assert_eq!(ns, None);
assert_eq!(info.addr, ConnectionAddr::Unix("/path/to/socket".into()));
}
Expand Down

0 comments on commit 8617e5e

Please sign in to comment.