diff --git a/docker2fl/src/main.rs b/docker2fl/src/main.rs index 39a4daa..14ad8a4 100644 --- a/docker2fl/src/main.rs +++ b/docker2fl/src/main.rs @@ -84,7 +84,7 @@ async fn main() -> Result<()> { }); let fl_name = docker_image.replace([':', '/'], "-") + ".fl"; - let meta = fungi::Writer::new(&fl_name).await?; + let meta = fungi::Writer::new(&fl_name, true).await?; let store = parse_router(&opts.store).await?; let res = docker2fl::convert(meta, store, &docker_image, credentials).await; diff --git a/docs/README.md b/docs/README.md index 55ce36e..a3447ad 100644 --- a/docs/README.md +++ b/docs/README.md @@ -64,10 +64,10 @@ the `block` table is used to associate data file blocks with files. An `id` fiel the route table holds routing information for the blobs. It basically describe where to find `blobs` with certain `ids`. The routing is done as following: -> Note routing table is loaded one time when `rfs` is started and +> Note routing table is loaded one time when `rfs` is started. - We use the first byte of the blob `id` as the `route key` -- The `route key`` is then consulted against the routing table +- The `route key` is then consulted against the routing table - While building an `FL` all matching stores are updated with the new blob. This is how the system does replication - On `getting` an object, the list of matching routes are tried in random order the first one to return a value is used - Note that same range and overlapping ranges are allowed, this is how shards and replications are done. diff --git a/rfs/README.md b/rfs/README.md index 45f529e..796f520 100644 --- a/rfs/README.md +++ b/rfs/README.md @@ -48,7 +48,7 @@ If the `start-end` range is not provided a `00-FF` range is assume basically a c This is only useful because `rfs` can accept multiple stores on the command line with different and/or overlapping ranges. -For example `-s 00-80=dir:///tmp/store0 -s 81-ff=dir://tmp/store1` means all keys that has prefix byte in range `[00-80]` will be written to /tmp/store0 all other keys `00-ff` will be written to store1. +For example `-s 00-80=dir:///tmp/store0 -s 81-ff=dir://tmp/store1` means all keys that has prefix byte in range `[00-80]` will be written to /tmp/store0 all other keys `[81-ff]` will be written to store1. The same range can appear multiple times, which means the blob will be replicated to all the stores that matches its key prefix. diff --git a/rfs/src/config.rs b/rfs/src/config.rs new file mode 100644 index 0000000..62eaf4d --- /dev/null +++ b/rfs/src/config.rs @@ -0,0 +1,72 @@ +use crate::{ + fungi::{meta::Tag, Reader, Result, Writer}, + store::{self, Store}, +}; + +pub async fn tag_list(reader: Reader) -> Result<()> { + let tags = reader.tags().await?; + if !tags.is_empty() { + println!("tags:"); + } + for (key, value) in tags { + println!("\t{}={}", key, value); + } + Ok(()) +} + +pub async fn tag_add(writer: Writer, tags: Vec<(String, String)>) -> Result<()> { + for (key, value) in tags { + writer.tag(Tag::Custom(key.as_str()), value).await?; + } + Ok(()) +} + +pub async fn tag_delete(writer: Writer, keys: Vec, all: bool) -> Result<()> { + if all { + writer.delete_tags().await?; + return Ok(()); + } + for key in keys { + writer.delete_tag(Tag::Custom(key.as_str())).await?; + } + Ok(()) +} + +pub async fn store_list(reader: Reader) -> Result<()> { + let routes = reader.routes().await?; + if !routes.is_empty() { + println!("routes:") + } + for route in routes { + println!( + "\trange:[{}-{}] store:{}", + route.start, route.end, route.url + ); + } + Ok(()) +} + +pub async fn store_add(writer: Writer, stores: Vec) -> Result<()> { + let store = store::parse_router(stores.as_slice()).await?; + for route in store.routes() { + writer + .route( + route.start.unwrap_or(u8::MIN), + route.end.unwrap_or(u8::MAX), + route.url, + ) + .await?; + } + Ok(()) +} + +pub async fn store_delete(writer: Writer, stores: Vec, all: bool) -> Result<()> { + if all { + writer.delete_routes().await?; + return Ok(()); + } + for store in stores { + writer.delete_route(store).await?; + } + Ok(()) +} diff --git a/rfs/src/fungi/meta.rs b/rfs/src/fungi/meta.rs index 8e13789..bc251cf 100644 --- a/rfs/src/fungi/meta.rs +++ b/rfs/src/fungi/meta.rs @@ -277,6 +277,14 @@ impl Reader { Ok(value.map(|v| v.0)) } + pub async fn tags(&self) -> Result> { + let tags: Vec<(String, String)> = sqlx::query_as("select key, value from tag;") + .fetch_all(&self.pool) + .await?; + + Ok(tags) + } + pub async fn routes(&self) -> Result> { let results: Vec = sqlx::query_as("select start, end, url from route;") .fetch_all(&self.pool) @@ -340,8 +348,10 @@ pub struct Writer { impl Writer { /// create a new mkondo writer - pub async fn new>(path: P) -> Result { - let _ = tokio::fs::remove_file(&path).await; + pub async fn new>(path: P, remove: bool) -> Result { + if remove { + let _ = tokio::fs::remove_file(&path).await; + } let opts = SqliteConnectOptions::new() .create_if_missing(true) @@ -409,13 +419,39 @@ impl Writer { } pub async fn tag>(&self, tag: Tag<'_>, value: V) -> Result<()> { - sqlx::query("insert into tag (key, value) values (?, ?);") + sqlx::query("insert or replace into tag (key, value) values (?, ?);") .bind(tag.key()) .bind(value.as_ref()) .execute(&self.pool) .await?; Ok(()) } + pub async fn delete_tag(&self, tag: Tag<'_>) -> Result<()> { + sqlx::query("delete from tag where key = ?;") + .bind(tag.key()) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn delete_route>(&self, url: U) -> Result<()> { + sqlx::query("delete from route where url = ?;") + .bind(url.as_ref()) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn delete_tags(&self) -> Result<()> { + sqlx::query("delete from tag;").execute(&self.pool).await?; + Ok(()) + } + pub async fn delete_routes(&self) -> Result<()> { + sqlx::query("delete from route;") + .execute(&self.pool) + .await?; + Ok(()) + } } #[cfg(test)] @@ -425,7 +461,7 @@ mod test { #[tokio::test] async fn test_inode() { const PATH: &str = "/tmp/inode.fl"; - let meta = Writer::new(PATH).await.unwrap(); + let meta = Writer::new(PATH, true).await.unwrap(); let ino = meta .inode(Inode { @@ -449,7 +485,7 @@ mod test { #[tokio::test] async fn test_get_children() { const PATH: &str = "/tmp/children.fl"; - let meta = Writer::new(PATH).await.unwrap(); + let meta = Writer::new(PATH, true).await.unwrap(); let ino = meta .inode(Inode { @@ -486,7 +522,7 @@ mod test { #[tokio::test] async fn test_get_block() { const PATH: &str = "/tmp/block.fl"; - let meta = Writer::new(PATH).await.unwrap(); + let meta = Writer::new(PATH, true).await.unwrap(); let hash: [u8; ID_LEN] = [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, @@ -509,7 +545,7 @@ mod test { #[tokio::test] async fn test_get_tag() { const PATH: &str = "/tmp/tag.fl"; - let meta = Writer::new(PATH).await.unwrap(); + let meta = Writer::new(PATH, true).await.unwrap(); meta.tag(Tag::Version, "0.1").await.unwrap(); meta.tag(Tag::Author, "azmy").await.unwrap(); meta.tag(Tag::Custom("custom"), "value").await.unwrap(); @@ -535,7 +571,7 @@ mod test { #[tokio::test] async fn test_get_routes() { const PATH: &str = "/tmp/route.fl"; - let meta = Writer::new(PATH).await.unwrap(); + let meta = Writer::new(PATH, true).await.unwrap(); meta.route(0, 128, "zdb://hub1.grid.tf").await.unwrap(); meta.route(129, 255, "zdb://hub2.grid.tf").await.unwrap(); @@ -560,7 +596,7 @@ mod test { #[tokio::test] async fn test_walk() { const PATH: &str = "/tmp/walk.fl"; - let meta = Writer::new(PATH).await.unwrap(); + let meta = Writer::new(PATH, true).await.unwrap(); let parent = meta .inode(Inode { diff --git a/rfs/src/lib.rs b/rfs/src/lib.rs index a1f855c..77f8e30 100644 --- a/rfs/src/lib.rs +++ b/rfs/src/lib.rs @@ -9,6 +9,7 @@ mod pack; pub use pack::pack; mod unpack; pub use unpack::unpack; +pub mod config; const PARALLEL_UPLOAD: usize = 10; // number of files we can upload in parallel @@ -53,7 +54,7 @@ mod test { } println!("file generation complete"); - let writer = meta::Writer::new(root.join("meta.fl")).await.unwrap(); + let writer = meta::Writer::new(root.join("meta.fl"), true).await.unwrap(); // while we at it we can already create 2 stores and create a router store on top // of that. diff --git a/rfs/src/main.rs b/rfs/src/main.rs index 647191a..9389f16 100644 --- a/rfs/src/main.rs +++ b/rfs/src/main.rs @@ -2,14 +2,15 @@ extern crate log; use nix::sys::signal::{self, Signal}; use nix::unistd::Pid; +use std::error::Error; use std::io::Read; use anyhow::{Context, Result}; use clap::{ArgAction, Args, Parser, Subcommand}; -use rfs::cache; use rfs::fungi; use rfs::store::{self, Router, Stores}; +use rfs::{cache, config}; mod fs; /// mount flists @@ -32,6 +33,8 @@ enum Commands { Pack(PackOptions), /// unpack (downloads) content of an FL the provided location Unpack(UnpackOptions), + /// list or modify FL metadata and stores + Config(ConfigOptions), } #[derive(Args, Debug)] @@ -90,10 +93,91 @@ struct UnpackOptions { #[clap(short, long, default_value_t = false)] preserve_ownership: bool, - /// target directory to upload + /// target directory for unpacking target: String, } +#[derive(Args, Debug)] +struct ConfigOptions { + /// path to metadata file (flist) + #[clap(short, long)] + meta: String, + + #[command(subcommand)] + command: ConfigCommands, +} + +#[derive(Subcommand, Debug)] +enum ConfigCommands { + #[command(subcommand)] + Tag(TagOperation), + #[command(subcommand)] + Store(StoreOperation), +} + +#[derive(Subcommand, Debug)] +enum TagOperation { + List, + Add(TagAddOptions), + Delete(TagDeleteOptions), +} + +#[derive(Args, Debug)] +struct TagAddOptions { + /// pair of key-values separated with '=' + #[clap(short, long, value_parser = parse_key_val::, number_of_values = 1)] + tag: Vec<(String, String)>, +} + +#[derive(Args, Debug)] +struct TagDeleteOptions { + /// key to remove + #[clap(short, long, action=ArgAction::Append)] + key: Vec, + /// remove all tags + #[clap(short, long, default_value_t = false)] + all: bool, +} + +#[derive(Subcommand, Debug)] +enum StoreOperation { + List, + Add(StoreAddOptions), + Delete(StoreDeleteOptions), +} + +#[derive(Args, Debug)] +struct StoreAddOptions { + /// store url in the format [xx-xx=]. the range xx-xx is optional and used for + /// sharding. the URL is per store type, please check docs for more information + #[clap(short, long, action=ArgAction::Append)] + store: Vec, +} + +#[derive(Args, Debug)] +struct StoreDeleteOptions { + /// store to remove + #[clap(short, long, action=ArgAction::Append)] + store: Vec, + /// remove all stores + #[clap(short, long, default_value_t = false)] + all: bool, +} + +/// Parse a single key-value pair +fn parse_key_val(s: &str) -> Result<(T, U), Box> +where + T: std::str::FromStr, + T::Err: Error + Send + Sync + 'static, + U: std::str::FromStr, + U::Err: Error + Send + Sync + 'static, +{ + let pos = s + .find('=') + .ok_or_else(|| format!("invalid KEY=value: no `=` found in `{s}`"))?; + Ok((s[..pos].parse()?, s[pos + 1..].parse()?)) +} + fn main() -> Result<()> { let opts = Options::parse(); @@ -115,6 +199,7 @@ fn main() -> Result<()> { Commands::Mount(opts) => mount(opts), Commands::Pack(opts) => pack(opts), Commands::Unpack(opts) => unpack(opts), + Commands::Config(opts) => config(opts), } } @@ -123,7 +208,7 @@ fn pack(opts: PackOptions) -> Result<()> { rt.block_on(async move { let store = store::parse_router(opts.store.as_slice()).await?; - let meta = fungi::Writer::new(opts.meta).await?; + let meta = fungi::Writer::new(opts.meta, true).await?; rfs::pack(meta, store, opts.target, !opts.no_strip_password).await?; Ok(()) @@ -240,3 +325,36 @@ async fn get_router(meta: &fungi::Reader) -> Result> { Ok(router) } + +fn config(opts: ConfigOptions) -> Result<()> { + let rt = tokio::runtime::Runtime::new()?; + + rt.block_on(async move { + let writer = fungi::Writer::new(opts.meta.clone(), false) + .await + .context("failed to initialize metadata database")?; + + let reader = fungi::Reader::new(opts.meta) + .await + .context("failed to initialize metadata database")?; + + match opts.command { + ConfigCommands::Tag(opts) => match opts { + TagOperation::List => config::tag_list(reader).await?, + TagOperation::Add(opts) => config::tag_add(writer, opts.tag).await?, + TagOperation::Delete(opts) => { + config::tag_delete(writer, opts.key, opts.all).await? + } + }, + ConfigCommands::Store(opts) => match opts { + StoreOperation::List => config::store_list(reader).await?, + StoreOperation::Add(opts) => config::store_add(writer, opts.store).await?, + StoreOperation::Delete(opts) => { + config::store_delete(writer, opts.store, opts.all).await? + } + }, + } + + Ok(()) + }) +}