Skip to content

Commit

Permalink
*: major refactor of thread implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
nerdroychan committed Aug 8, 2024
1 parent 5e87553 commit 5ad3730
Show file tree
Hide file tree
Showing 17 changed files with 129 additions and 204 deletions.
59 changes: 22 additions & 37 deletions src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@
//! post-process the output and make a smooth CDF plot out of it.

use crate::stores::{BenchKVMap, BenchKVMapOpt};
use crate::thread::{JoinHandle, Thread};
use crate::workload::{Workload, WorkloadOpt};
use crate::*;
use figment::providers::{Env, Format, Toml};
Expand Down Expand Up @@ -301,7 +300,7 @@ impl BenchmarkOpt {

/// The configuration of a benchmark, parsed from user's input.
#[derive(Debug, PartialEq)]
pub struct Benchmark {
pub(crate) struct Benchmark {
threads: usize,
repeat: usize,
qd: usize,
Expand Down Expand Up @@ -470,7 +469,7 @@ struct BenchmarkGroupOpt {

// {{{ bencher

pub fn init(text: &str) -> (BenchKVMap, Vec<Arc<Benchmark>>) {
pub(crate) fn init(text: &str) -> (BenchKVMap, Vec<Arc<Benchmark>>) {
let opt: BenchmarkGroupOpt = Figment::new()
.merge(Toml::string(text))
.merge(Env::raw())
Expand Down Expand Up @@ -779,11 +778,9 @@ impl RateLimiter {
}
}

fn bench_worker_regular(
map: Arc<Box<impl KVMap + ?Sized>>,
context: WorkerContext,
thread: impl Thread,
) {
fn bench_worker_regular(map: Arc<Box<dyn KVMap>>, context: WorkerContext) {
let thread = map.thread();

let WorkerContext {
benchmark,
since,
Expand Down Expand Up @@ -894,11 +891,9 @@ fn bench_worker_regular(
}
}

fn bench_worker_async(
map: Arc<Box<impl AsyncKVMap + ?Sized>>,
context: WorkerContext,
thread: impl Thread,
) {
fn bench_worker_async(map: Arc<Box<dyn AsyncKVMap>>, context: WorkerContext) {
let thread = map.thread();

let WorkerContext {
benchmark,
since,
Expand Down Expand Up @@ -1046,12 +1041,12 @@ fn bench_worker_async(
}

fn bench_phase_regular(
map: Arc<Box<impl KVMap + ?Sized>>,
map: Arc<Box<dyn KVMap>>,
benchmark: Arc<Benchmark>,
phase: usize,
since: Arc<Instant>,
thread: &impl Thread,
) {
let thread = map.thread();
let barrier = Arc::new(Barrier::new(benchmark.threads.try_into().unwrap()));
let measurements: Vec<Arc<Measurement>> = (0..benchmark.threads)
.map(|_| Arc::new(Measurement::new(benchmark.repeat)))
Expand All @@ -1070,10 +1065,9 @@ fn bench_phase_regular(
since: *since,
thread_info,
};
let worker_thread = thread.clone();
let handle = thread.spawn(move || {
bench_worker_regular(map, context, worker_thread);
});
let handle = thread.spawn(Box::new(move || {
bench_worker_regular(map, context);
}));
handles.push(handle);
}

Expand All @@ -1086,12 +1080,12 @@ fn bench_phase_regular(
}

fn bench_phase_async(
map: Arc<Box<impl AsyncKVMap + ?Sized>>,
map: Arc<Box<dyn AsyncKVMap>>,
benchmark: Arc<Benchmark>,
phase: usize,
since: Arc<Instant>,
thread: &impl Thread,
) {
let thread = map.thread();
let barrier = Arc::new(Barrier::new((benchmark.threads).try_into().unwrap()));
let measurements: Vec<Arc<Measurement>> = (0..benchmark.threads)
.map(|_| Arc::new(Measurement::new(benchmark.repeat)))
Expand All @@ -1110,10 +1104,9 @@ fn bench_phase_async(
since: *since,
thread_info,
};
let worker_thread = thread.clone();
let handle = thread.spawn(move || {
bench_worker_async(map, context, worker_thread);
});
let handle = thread.spawn(Box::new(move || {
bench_worker_async(map, context);
}));
handles.push(handle);
}

Expand All @@ -1127,30 +1120,22 @@ fn bench_phase_async(
/// The real benchmark function for [`KVMap`].
///
/// **You may not need to check this if it is OK to run benchmarks with [`std::thread`].**
pub fn bench_regular(
map: Arc<Box<impl KVMap + ?Sized>>,
phases: &Vec<Arc<Benchmark>>,
thread: impl Thread,
) {
pub(crate) fn bench_regular(map: Arc<Box<dyn KVMap>>, phases: &Vec<Arc<Benchmark>>) {
debug!("Running regular bencher");
let start = Arc::new(Instant::now());
for (i, p) in phases.iter().enumerate() {
bench_phase_regular(map.clone(), p.clone(), i, start.clone(), &thread);
bench_phase_regular(map.clone(), p.clone(), i, start.clone());
}
}

/// The real benchmark function for [`AsyncKVMap`].
///
/// **You may not need to check this if it is OK to run benchmarks with [`std::thread`].**
pub fn bench_async(
map: Arc<Box<impl AsyncKVMap + ?Sized>>,
phases: &Vec<Arc<Benchmark>>,
thread: impl Thread,
) {
pub(crate) fn bench_async(map: Arc<Box<dyn AsyncKVMap>>, phases: &Vec<Arc<Benchmark>>) {
debug!("Running async bencher");
let start = Arc::new(Instant::now());
for (i, p) in phases.iter().enumerate() {
bench_phase_async(map.clone(), p.clone(), i, start.clone(), &thread);
bench_phase_async(map.clone(), p.clone(), i, start.clone());
}
}

Expand Down
11 changes: 2 additions & 9 deletions src/cmdline.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stores::{BenchKVMap, Registry};
use crate::stores::Registry;
use clap::ValueHint::FilePath;
use clap::{Args, Parser, Subcommand};
use log::debug;
Expand Down Expand Up @@ -83,14 +83,7 @@ fn server_cli(args: &ServerArgs) {
})
.expect("Error setting Ctrl-C handler for server");

match map {
BenchKVMap::Regular(map) => {
map.server(&host, &port, nr_workers, stop_rx, grace_tx);
}
BenchKVMap::Async(map) => {
map.server(&host, &port, nr_workers, stop_rx, grace_tx);
}
}
map.server(&host, &port, nr_workers, stop_rx, grace_tx);

assert!(grace_rx.recv().is_ok());
debug!("All server threads have been shut down gracefully, exit");
Expand Down
52 changes: 4 additions & 48 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Arc;

/// A synchronous, thread-safe key-value store.
Expand All @@ -44,34 +43,8 @@ pub trait KVMap: Send + Sync + 'static {
/// For most stores, this can just be done using an Arc.
fn handle(&self) -> Box<dyn KVMapHandle>;

/// The main bench method.
///
/// Users usually don't need to manually implement this method unless the implementer needs
/// custom thread spawn-join functions. If one would like to do so, it is needed to explicitly
/// declare a new [`thread::Thread`] object and pass it to [`bench::bench_regular`].
fn bench(self: Box<Self>, phases: &Vec<Arc<crate::bench::Benchmark>>) {
let map = Arc::new(self);
let thread = crate::thread::DefaultThread;
crate::bench::bench_regular(map, phases, thread);
}

/// Start the main loop of KV server while using this map as the backend.
///
/// There is no need to manually implement this method unless the implementer needs custom
/// thread spawn-join functions. If one would like to manually implement this method, it is
/// needed to explicitly declare a new [`thread::Thread`] object and pass it to
/// [`server::server_regular`].
fn server(
self: Box<Self>,
host: &str,
port: &str,
nr_workers: usize,
stop_rx: Receiver<()>,
grace_tx: Sender<()>,
) {
let map = Arc::new(self);
let thread = crate::thread::DefaultThread;
crate::server::server_regular(map, host, port, nr_workers, stop_rx, grace_tx, thread);
fn thread(&self) -> Box<dyn crate::thread::Thread> {
Box::new(self::thread::DefaultThread)
}
}

Expand Down Expand Up @@ -150,25 +123,8 @@ pub trait AsyncKVMap: Sync + Send + 'static {
/// corresponds to a shared `responder` that implements [`AsyncResponder`].
fn handle(&self, responder: Rc<dyn AsyncResponder>) -> Box<dyn AsyncKVMapHandle>;

/// Similar to [`KVMap::bench`], but calls [`bench::bench_async`] instead.
fn bench(self: Box<Self>, phases: &Vec<Arc<crate::bench::Benchmark>>) {
let map = Arc::new(self);
let thread = crate::thread::DefaultThread;
crate::bench::bench_async(map, phases, thread);
}

/// Similar to [`KVMap::server`], but calls [`server::server_async`] instead.
fn server(
self: Box<Self>,
host: &str,
port: &str,
nr_workers: usize,
stop_rx: Receiver<()>,
grace_tx: Sender<()>,
) {
let map = Arc::new(self);
let thread = crate::thread::DefaultThread;
crate::server::server_async(map, host, port, nr_workers, stop_rx, grace_tx, thread);
fn thread(&self) -> Box<dyn crate::thread::Thread> {
Box::new(self::thread::DefaultThread)
}
}

Expand Down
Loading

0 comments on commit 5ad3730

Please sign in to comment.