Skip to content

Commit

Permalink
[Feat] YCSB stat (#91)
Browse files Browse the repository at this point in the history
* [Feat] ycsb stat

* [Fix] cargo sort

* [Fix] format

* [Fix] review

* [Fix] review
  • Loading branch information
IHEII authored Jan 19, 2024
1 parent fb17059 commit 4c09a73
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ycsb-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ serde = { version = "1.0.130", features = ["derive"] }
sql-builder = "3.1"
sqlite = "0.26.0"
structopt = "0.3.23"
tokio = { workspace = true }
toml = { workspace = true }
85 changes: 77 additions & 8 deletions ycsb-rs/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::{
cell::RefCell,
fs,
Expand All @@ -12,6 +14,7 @@ use obkv::dump_metrics;
use properties::Properties;
use rand::{rngs::SmallRng, SeedableRng};
use structopt::StructOpt;
use tokio::time as TokioTime;
use workload::CoreWorkload;

use crate::{
Expand Down Expand Up @@ -53,9 +56,15 @@ fn run(wl: Arc<CoreWorkload>, db: Rc<dyn DB>, rng: Rc<RefCell<SmallRng>>, operat
}
}

async fn load_ob(wl: Arc<CoreWorkload>, db: Arc<OBKVClient>, operation_count: usize) {
async fn load_ob(
wl: Arc<CoreWorkload>,
db: Arc<OBKVClient>,
operation_count: usize,
counter: Arc<AtomicUsize>,
) {
for _ in 0..operation_count {
wl.ob_insert(db.clone()).await;
counter.fetch_add(1, Ordering::Relaxed);
}
}

Expand All @@ -64,9 +73,11 @@ async fn run_ob(
db: Arc<OBKVClient>,
rng: Arc<Mutex<SmallRng>>,
operation_count: usize,
counter: Arc<AtomicUsize>,
) {
for _ in 0..operation_count {
wl.ob_transaction(rng.clone(), db.clone()).await;
counter.fetch_add(1, Ordering::Relaxed);
}
}

Expand All @@ -88,12 +99,22 @@ fn main() -> Result<()> {
}

let database = opt.database.clone();
let total_ops_count = props.operation_count;
let thread_operation_count = props.operation_count as usize / opt.threads;
let actual_client_count = opt.threads / props.obkv_client_reuse;

// verify count of operations
assert_eq!(
props.operation_count,
(actual_client_count * thread_operation_count * props.obkv_client_reuse) as u64,
" 'operationcount' should be an exact multiple of the 'threads', and 'threads' should be an exact multiple of the 'obkv_client_reuse'"
);

for cmd in opt.commands {
let start = Instant::now();
let mut tasks = vec![];
let mut threads = vec![];
let mut db_counters = vec![];
println!(
"Database: {database}, Command: {cmd}, Counts Per Threads: {thread_operation_count}"
);
Expand All @@ -103,25 +124,69 @@ fn main() -> Result<()> {
);
if database.eq_ignore_ascii_case("obkv") {
let runtimes = runtime::build_ycsb_runtimes(props.clone());
for _client_idx in 0..actual_client_count {
for _ in 0..actual_client_count {
let database = database.clone();
let db = db::create_ob(&database, config.clone()).unwrap();
// count the ops per client
let counter = Arc::new(AtomicUsize::new(0));
db_counters.push(counter.clone());
for _ in 0..props.obkv_client_reuse {
let db = db.clone();
let wl = wl.clone();
let cmd = cmd.clone();
let runtime = runtimes.default_runtime.clone();
let counter_clone = counter.clone();
tasks.push(runtime.spawn(async move {
let rng = Arc::new(Mutex::new(SmallRng::from_entropy()));
db.init().unwrap();
match &cmd[..] {
"load" => load_ob(wl.clone(), db, thread_operation_count).await,
"run" => run_ob(wl.clone(), db, rng, thread_operation_count).await,
cmd => panic!("invalid command: {cmd}"),
match cmd.as_str() {
"load" => {
load_ob(wl.clone(), db, thread_operation_count, counter_clone).await
}
"run" => {
run_ob(wl.clone(), db, rng, thread_operation_count, counter_clone)
.await
}
_ => panic!("invalid command: {cmd}"),
};
}));
}
}
// show progress
let stat_duration_sec = props.show_progress_duration;
tasks.push(runtimes.default_runtime.spawn(async move {
let mut interval = TokioTime::interval(Duration::from_secs(stat_duration_sec));
let mut prev_count = 0;
loop {
interval.tick().await;
let completed_operations: usize = db_counters
.iter()
.map(|arc| arc.load(Ordering::Relaxed))
.sum();
let ops_per_second =
(completed_operations - prev_count) as f64 / stat_duration_sec as f64;
prev_count = completed_operations;
// estimate remaining time
let remaining_operations = total_ops_count - completed_operations as u64;
let estimated_remaining_time = if ops_per_second > 0.0 {
Duration::from_secs_f64(remaining_operations as f64 / ops_per_second)
} else {
Duration::from_secs(0)
};
println!("\n-------------------------------");
println!(
"Throughput(ops/sec) in previous period: {:.2}",
ops_per_second
);
println!("Runtime: {:?}", start.elapsed());
println!("Estimate remaining time: {:?}", estimated_remaining_time);

if completed_operations >= total_ops_count as usize {
println!("All is done");
break;
}
}
}));
runtimes.block_runtime.block_on(async move {
for task in tasks {
task.await.expect("task failed");
Expand All @@ -142,7 +207,7 @@ fn main() -> Result<()> {
match &cmd[..] {
"load" => load(wl.clone(), db, thread_operation_count),
"run" => run(wl.clone(), db, rng, thread_operation_count),
cmd => panic!("invalid command: {cmd}"),
_ => panic!("invalid command: {cmd}"),
};
}));
}
Expand All @@ -151,13 +216,17 @@ fn main() -> Result<()> {
}
}
let runtime = start.elapsed().as_millis();
println!("****************************");
println!("[OVERALL], ThreadCount, {}", opt.threads);
println!("[OVERALL], RunTime(ms), {runtime}");
let throughput = props.operation_count as f64 / (runtime as f64 / 1000.0);
println!("[OVERALL], Throughput(ops/sec), {throughput}\n");
println!("****************************");
}

println!("{}", dump_metrics().expect("dump metrics failed"));
if props.show_prometheus {
println!("{}", dump_metrics().expect("dump metrics failed"));
}

Ok(())
}
15 changes: 15 additions & 0 deletions ycsb-rs/src/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ fn zero_u64() -> u64 {
0
}

fn show_prometheus_default() -> bool {
false
}

fn show_progress_duration_default() -> u64 {
60
}

fn thread_count_default() -> u64 {
200
}
Expand Down Expand Up @@ -159,6 +167,13 @@ pub struct Properties {
pub field_length: u64,
#[serde(default = "batch_count_default", rename = "batchcount")]
pub batch_count: u64,
#[serde(default = "show_prometheus_default", rename = "showprometheus")]
pub show_prometheus: bool,
#[serde(
default = "show_progress_duration_default",
rename = "show_progress_duration_sec"
)]
pub show_progress_duration: u64,

// read, update, insert, scan, read-modify-write
#[serde(default = "read_proportion_default", rename = "readproportion")]
Expand Down
6 changes: 6 additions & 0 deletions ycsb-rs/workloads/workload_obkv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ readallfields = true
# The number of operation in one batch
batchcount = 100

# Show params from prometheus or not
showprometheus = false

# Duration of showing progress
show_progress_duration_sec = 30

insertproportion = 0
readproportion = 1.0
scanproportion = 0
Expand Down

0 comments on commit 4c09a73

Please sign in to comment.