From 4c09a7384434318595ad5a2c01915efc9b9405ab Mon Sep 17 00:00:00 2001 From: IHEII Date: Fri, 19 Jan 2024 20:21:18 +0800 Subject: [PATCH] [Feat] YCSB stat (#91) * [Feat] ycsb stat * [Fix] cargo sort * [Fix] format * [Fix] review * [Fix] review --- Cargo.lock | 1 + ycsb-rs/Cargo.toml | 1 + ycsb-rs/src/main.rs | 85 +++++++++++++++++++++++++--- ycsb-rs/src/properties.rs | 15 +++++ ycsb-rs/workloads/workload_obkv.toml | 6 ++ 5 files changed, 100 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f6eca1..7f8e5db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1451,6 +1451,7 @@ dependencies = [ "sql-builder", "sqlite", "structopt", + "tokio", "toml", ] diff --git a/ycsb-rs/Cargo.toml b/ycsb-rs/Cargo.toml index 810dc0d..4832d36 100644 --- a/ycsb-rs/Cargo.toml +++ b/ycsb-rs/Cargo.toml @@ -15,4 +15,5 @@ serde = { version = "1.0.130", features = ["derive"] } sql-builder = "3.1" sqlite = "0.26.0" structopt = "0.3.23" +tokio = { workspace = true } toml = { workspace = true } diff --git a/ycsb-rs/src/main.rs b/ycsb-rs/src/main.rs index 55c7a17..f905c3b 100644 --- a/ycsb-rs/src/main.rs +++ b/ycsb-rs/src/main.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; use std::{ cell::RefCell, fs, @@ -12,6 +14,7 @@ use obkv::dump_metrics; use properties::Properties; use rand::{rngs::SmallRng, SeedableRng}; use structopt::StructOpt; +use tokio::time as TokioTime; use workload::CoreWorkload; use crate::{ @@ -53,9 +56,15 @@ fn run(wl: Arc, db: Rc, rng: Rc>, operat } } -async fn load_ob(wl: Arc, db: Arc, operation_count: usize) { +async fn load_ob( + wl: Arc, + db: Arc, + operation_count: usize, + counter: Arc, +) { for _ in 0..operation_count { wl.ob_insert(db.clone()).await; + counter.fetch_add(1, Ordering::Relaxed); } } @@ -64,9 +73,11 @@ async fn run_ob( db: Arc, rng: Arc>, operation_count: usize, + counter: Arc, ) { for _ in 0..operation_count { wl.ob_transaction(rng.clone(), db.clone()).await; + counter.fetch_add(1, Ordering::Relaxed); } } @@ -88,12 +99,22 @@ fn main() -> Result<()> { } let database = opt.database.clone(); + let total_ops_count = props.operation_count; let thread_operation_count = props.operation_count as usize / opt.threads; let actual_client_count = opt.threads / props.obkv_client_reuse; + + // verify count of operations + assert_eq!( + props.operation_count, + (actual_client_count * thread_operation_count * props.obkv_client_reuse) as u64, + " 'operationcount' should be an exact multiple of the 'threads', and 'threads' should be an exact multiple of the 'obkv_client_reuse'" + ); + for cmd in opt.commands { let start = Instant::now(); let mut tasks = vec![]; let mut threads = vec![]; + let mut db_counters = vec![]; println!( "Database: {database}, Command: {cmd}, Counts Per Threads: {thread_operation_count}" ); @@ -103,25 +124,69 @@ fn main() -> Result<()> { ); if database.eq_ignore_ascii_case("obkv") { let runtimes = runtime::build_ycsb_runtimes(props.clone()); - for _client_idx in 0..actual_client_count { + for _ in 0..actual_client_count { let database = database.clone(); let db = db::create_ob(&database, config.clone()).unwrap(); + // count the ops per client + let counter = Arc::new(AtomicUsize::new(0)); + db_counters.push(counter.clone()); for _ in 0..props.obkv_client_reuse { let db = db.clone(); let wl = wl.clone(); let cmd = cmd.clone(); let runtime = runtimes.default_runtime.clone(); + let counter_clone = counter.clone(); tasks.push(runtime.spawn(async move { let rng = Arc::new(Mutex::new(SmallRng::from_entropy())); db.init().unwrap(); - match &cmd[..] { - "load" => load_ob(wl.clone(), db, thread_operation_count).await, - "run" => run_ob(wl.clone(), db, rng, thread_operation_count).await, - cmd => panic!("invalid command: {cmd}"), + match cmd.as_str() { + "load" => { + load_ob(wl.clone(), db, thread_operation_count, counter_clone).await + } + "run" => { + run_ob(wl.clone(), db, rng, thread_operation_count, counter_clone) + .await + } + _ => panic!("invalid command: {cmd}"), }; })); } } + // show progress + let stat_duration_sec = props.show_progress_duration; + tasks.push(runtimes.default_runtime.spawn(async move { + let mut interval = TokioTime::interval(Duration::from_secs(stat_duration_sec)); + let mut prev_count = 0; + loop { + interval.tick().await; + let completed_operations: usize = db_counters + .iter() + .map(|arc| arc.load(Ordering::Relaxed)) + .sum(); + let ops_per_second = + (completed_operations - prev_count) as f64 / stat_duration_sec as f64; + prev_count = completed_operations; + // estimate remaining time + let remaining_operations = total_ops_count - completed_operations as u64; + let estimated_remaining_time = if ops_per_second > 0.0 { + Duration::from_secs_f64(remaining_operations as f64 / ops_per_second) + } else { + Duration::from_secs(0) + }; + println!("\n-------------------------------"); + println!( + "Throughput(ops/sec) in previous period: {:.2}", + ops_per_second + ); + println!("Runtime: {:?}", start.elapsed()); + println!("Estimate remaining time: {:?}", estimated_remaining_time); + + if completed_operations >= total_ops_count as usize { + println!("All is done"); + break; + } + } + })); runtimes.block_runtime.block_on(async move { for task in tasks { task.await.expect("task failed"); @@ -142,7 +207,7 @@ fn main() -> Result<()> { match &cmd[..] { "load" => load(wl.clone(), db, thread_operation_count), "run" => run(wl.clone(), db, rng, thread_operation_count), - cmd => panic!("invalid command: {cmd}"), + _ => panic!("invalid command: {cmd}"), }; })); } @@ -151,13 +216,17 @@ fn main() -> Result<()> { } } let runtime = start.elapsed().as_millis(); + println!("****************************"); println!("[OVERALL], ThreadCount, {}", opt.threads); println!("[OVERALL], RunTime(ms), {runtime}"); let throughput = props.operation_count as f64 / (runtime as f64 / 1000.0); println!("[OVERALL], Throughput(ops/sec), {throughput}\n"); + println!("****************************"); } - println!("{}", dump_metrics().expect("dump metrics failed")); + if props.show_prometheus { + println!("{}", dump_metrics().expect("dump metrics failed")); + } Ok(()) } diff --git a/ycsb-rs/src/properties.rs b/ycsb-rs/src/properties.rs index 1796e98..711b1e1 100644 --- a/ycsb-rs/src/properties.rs +++ b/ycsb-rs/src/properties.rs @@ -4,6 +4,14 @@ fn zero_u64() -> u64 { 0 } +fn show_prometheus_default() -> bool { + false +} + +fn show_progress_duration_default() -> u64 { + 60 +} + fn thread_count_default() -> u64 { 200 } @@ -159,6 +167,13 @@ pub struct Properties { pub field_length: u64, #[serde(default = "batch_count_default", rename = "batchcount")] pub batch_count: u64, + #[serde(default = "show_prometheus_default", rename = "showprometheus")] + pub show_prometheus: bool, + #[serde( + default = "show_progress_duration_default", + rename = "show_progress_duration_sec" + )] + pub show_progress_duration: u64, // read, update, insert, scan, read-modify-write #[serde(default = "read_proportion_default", rename = "readproportion")] diff --git a/ycsb-rs/workloads/workload_obkv.toml b/ycsb-rs/workloads/workload_obkv.toml index d98acde..ba0b7db 100644 --- a/ycsb-rs/workloads/workload_obkv.toml +++ b/ycsb-rs/workloads/workload_obkv.toml @@ -27,6 +27,12 @@ readallfields = true # The number of operation in one batch batchcount = 100 +# Show params from prometheus or not +showprometheus = false + +# Duration of showing progress +show_progress_duration_sec = 30 + insertproportion = 0 readproportion = 1.0 scanproportion = 0