From b1b034e330caf41b1921f69b16f9a8bf1d81aa04 Mon Sep 17 00:00:00 2001 From: Chen Chen Date: Thu, 8 Aug 2024 01:50:52 -0500 Subject: [PATCH] *: major refactor of thread implementation --- src/bench.rs | 55 +++++++------------ src/cmdline.rs | 11 +--- src/lib.rs | 52 ++---------------- src/server.rs | 117 ++++++++++++++++------------------------- src/stores.rs | 35 +++++++++--- src/stores/btreemap.rs | 4 +- src/stores/chashmap.rs | 2 +- src/stores/contrie.rs | 2 +- src/stores/dashmap.rs | 2 +- src/stores/flurry.rs | 2 +- src/stores/hashmap.rs | 4 +- src/stores/null.rs | 4 +- src/stores/papaya.rs | 2 +- src/stores/remote.rs | 2 +- src/stores/rocksdb.rs | 2 +- src/stores/scc.rs | 2 +- src/thread.rs | 24 ++++----- 17 files changed, 124 insertions(+), 198 deletions(-) diff --git a/src/bench.rs b/src/bench.rs index 8bf8170..68e7e99 100644 --- a/src/bench.rs +++ b/src/bench.rs @@ -130,7 +130,6 @@ //! post-process the output and make a smooth CDF plot out of it. use crate::stores::{BenchKVMap, BenchKVMapOpt}; -use crate::thread::{JoinHandle, Thread}; use crate::workload::{Workload, WorkloadOpt}; use crate::*; use figment::providers::{Env, Format, Toml}; @@ -301,7 +300,7 @@ impl BenchmarkOpt { /// The configuration of a benchmark, parsed from user's input. #[derive(Debug, PartialEq)] -pub struct Benchmark { +pub(crate) struct Benchmark { threads: usize, repeat: usize, qd: usize, @@ -470,7 +469,7 @@ struct BenchmarkGroupOpt { // {{{ bencher -pub fn init(text: &str) -> (BenchKVMap, Vec>) { +pub(crate) fn init(text: &str) -> (BenchKVMap, Vec>) { let opt: BenchmarkGroupOpt = Figment::new() .merge(Toml::string(text)) .merge(Env::raw()) @@ -779,11 +778,9 @@ impl RateLimiter { } } -fn bench_worker_regular( - map: Arc>, - context: WorkerContext, - thread: impl Thread, -) { +fn bench_worker_regular(map: Arc>, context: WorkerContext) { + let thread = map.thread(); + let WorkerContext { benchmark, since, @@ -894,11 +891,9 @@ fn bench_worker_regular( } } -fn bench_worker_async( - map: Arc>, - context: WorkerContext, - thread: impl Thread, -) { +fn bench_worker_async(map: Arc>, context: WorkerContext) { + let thread = map.thread(); + let WorkerContext { benchmark, since, @@ -1050,8 +1045,8 @@ fn bench_phase_regular( benchmark: Arc, phase: usize, since: Arc, - thread: &impl Thread, ) { + let thread = map.thread(); let barrier = Arc::new(Barrier::new(benchmark.threads.try_into().unwrap())); let measurements: Vec> = (0..benchmark.threads) .map(|_| Arc::new(Measurement::new(benchmark.repeat))) @@ -1070,10 +1065,9 @@ fn bench_phase_regular( since: *since, thread_info, }; - let worker_thread = thread.clone(); - let handle = thread.spawn(move || { - bench_worker_regular(map, context, worker_thread); - }); + let handle = thread.spawn(Box::new(move || { + bench_worker_regular(map, context); + })); handles.push(handle); } @@ -1090,8 +1084,8 @@ fn bench_phase_async( benchmark: Arc, phase: usize, since: Arc, - thread: &impl Thread, ) { + let thread = map.thread(); let barrier = Arc::new(Barrier::new((benchmark.threads).try_into().unwrap())); let measurements: Vec> = (0..benchmark.threads) .map(|_| Arc::new(Measurement::new(benchmark.repeat))) @@ -1110,10 +1104,9 @@ fn bench_phase_async( since: *since, thread_info, }; - let worker_thread = thread.clone(); - let handle = thread.spawn(move || { - bench_worker_async(map, context, worker_thread); - }); + let handle = thread.spawn(Box::new(move || { + bench_worker_async(map, context); + })); handles.push(handle); } @@ -1127,30 +1120,22 @@ fn bench_phase_async( /// The real benchmark function for [`KVMap`]. /// /// **You may not need to check this if it is OK to run benchmarks with [`std::thread`].** -pub fn bench_regular( - map: Arc>, - phases: &Vec>, - thread: impl Thread, -) { +pub(crate) fn bench_regular(map: Arc>, phases: &Vec>) { debug!("Running regular bencher"); let start = Arc::new(Instant::now()); for (i, p) in phases.iter().enumerate() { - bench_phase_regular(map.clone(), p.clone(), i, start.clone(), &thread); + bench_phase_regular(map.clone(), p.clone(), i, start.clone()); } } /// The real benchmark function for [`AsyncKVMap`]. /// /// **You may not need to check this if it is OK to run benchmarks with [`std::thread`].** -pub fn bench_async( - map: Arc>, - phases: &Vec>, - thread: impl Thread, -) { +pub(crate) fn bench_async(map: Arc>, phases: &Vec>) { debug!("Running async bencher"); let start = Arc::new(Instant::now()); for (i, p) in phases.iter().enumerate() { - bench_phase_async(map.clone(), p.clone(), i, start.clone(), &thread); + bench_phase_async(map.clone(), p.clone(), i, start.clone()); } } diff --git a/src/cmdline.rs b/src/cmdline.rs index 0d84871..3cf7f36 100644 --- a/src/cmdline.rs +++ b/src/cmdline.rs @@ -1,4 +1,4 @@ -use crate::stores::{BenchKVMap, Registry}; +use crate::stores::Registry; use clap::ValueHint::FilePath; use clap::{Args, Parser, Subcommand}; use log::debug; @@ -83,14 +83,7 @@ fn server_cli(args: &ServerArgs) { }) .expect("Error setting Ctrl-C handler for server"); - match map { - BenchKVMap::Regular(map) => { - map.server(&host, &port, nr_workers, stop_rx, grace_tx); - } - BenchKVMap::Async(map) => { - map.server(&host, &port, nr_workers, stop_rx, grace_tx); - } - } + map.server(&host, &port, nr_workers, stop_rx, grace_tx); assert!(grace_rx.recv().is_ok()); debug!("All server threads have been shut down gracefully, exit"); diff --git a/src/lib.rs b/src/lib.rs index a3d7a57..8f2587a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,7 +31,6 @@ use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::rc::Rc; -use std::sync::mpsc::{Receiver, Sender}; use std::sync::Arc; /// A synchronous, thread-safe key-value store. @@ -44,34 +43,8 @@ pub trait KVMap: Send + Sync + 'static { /// For most stores, this can just be done using an Arc. fn handle(&self) -> Box; - /// The main bench method. - /// - /// Users usually don't need to manually implement this method unless the implementer needs - /// custom thread spawn-join functions. If one would like to do so, it is needed to explicitly - /// declare a new [`thread::Thread`] object and pass it to [`bench::bench_regular`]. - fn bench(self: Box, phases: &Vec>) { - let map = Arc::new(self); - let thread = crate::thread::DefaultThread; - crate::bench::bench_regular(map, phases, thread); - } - - /// Start the main loop of KV server while using this map as the backend. - /// - /// There is no need to manually implement this method unless the implementer needs custom - /// thread spawn-join functions. If one would like to manually implement this method, it is - /// needed to explicitly declare a new [`thread::Thread`] object and pass it to - /// [`server::server_regular`]. - fn server( - self: Box, - host: &str, - port: &str, - nr_workers: usize, - stop_rx: Receiver<()>, - grace_tx: Sender<()>, - ) { - let map = Arc::new(self); - let thread = crate::thread::DefaultThread; - crate::server::server_regular(map, host, port, nr_workers, stop_rx, grace_tx, thread); + fn thread(&self) -> Box { + Box::new(self::thread::DefaultThread) } } @@ -150,25 +123,8 @@ pub trait AsyncKVMap: Sync + Send + 'static { /// corresponds to a shared `responder` that implements [`AsyncResponder`]. fn handle(&self, responder: Rc) -> Box; - /// Similar to [`KVMap::bench`], but calls [`bench::bench_async`] instead. - fn bench(self: Box, phases: &Vec>) { - let map = Arc::new(self); - let thread = crate::thread::DefaultThread; - crate::bench::bench_async(map, phases, thread); - } - - /// Similar to [`KVMap::server`], but calls [`server::server_async`] instead. - fn server( - self: Box, - host: &str, - port: &str, - nr_workers: usize, - stop_rx: Receiver<()>, - grace_tx: Sender<()>, - ) { - let map = Arc::new(self); - let thread = crate::thread::DefaultThread; - crate::server::server_async(map, host, port, nr_workers, stop_rx, grace_tx, thread); + fn thread(&self) -> Box { + Box::new(self::thread::DefaultThread) } } diff --git a/src/server.rs b/src/server.rs index 2f5ede1..60d35fc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -277,7 +277,7 @@ fn server_worker_regular_main( events: &mut Events, smap: &mut StreamMap, handle: &mut Box, - thread: &impl Thread, + thread: &Box, ) { for (_, connection) in smap.iter_mut() { assert!(connection.writer().flush().is_ok()); @@ -307,7 +307,7 @@ fn server_worker_async_main( poll: &mut Poll, events: &mut Events, smap: &mut StreamMap, - thread: &impl Thread, + thread: &Box, ) { for (_, connection) in smap.iter_mut() { let (writer, handle) = connection.handle(); @@ -385,8 +385,8 @@ fn server_worker_regular( txs: Vec>, nr_workers: usize, counter: Arc, - thread: impl Thread, ) { + let thread = map.thread(); thread.pin(worker_id); let (mut events, mut smap, mut poll) = server_worker_common(); @@ -433,8 +433,8 @@ fn server_worker_async( txs: Vec>, nr_workers: usize, counter: Arc, - thread: impl Thread, ) { + let thread = map.thread(); thread.pin(worker_id); let (mut events, mut smap, mut poll) = server_worker_common(); @@ -496,8 +496,8 @@ fn server_mainloop( stop_rx: Receiver<()>, grace_tx: Sender<()>, senders: Vec>, - mut handles: Vec, - thread: impl Thread, + mut handles: Vec>, + thread: Box, ) { loop { if let Ok(_) = stop_rx.try_recv() { @@ -515,18 +515,16 @@ fn server_mainloop( assert!(grace_tx.send(()).is_ok()); } -/// The real server function for [`KVMap`]. -/// -/// **You may not need to check this if it is OK to run benchmarks with [`std::thread`].** -pub fn server_regular( +pub(crate) fn server_regular( map: Arc>, host: &str, port: &str, nr_workers: usize, stop_rx: Receiver<()>, grace_tx: Sender<()>, - thread: impl Thread, ) { + let thread = map.thread(); + let (listener, senders, mut receivers, counter) = server_common(host, port, nr_workers); let mut handles = Vec::new(); @@ -537,37 +535,25 @@ pub fn server_regular( let rx = receivers.pop().unwrap(); // guaranteed to succeed let nr_workers = nr_workers.clone(); let counter = counter.clone(); - let worker_thread = thread.clone(); - let handle = thread.spawn(move || { - server_worker_regular( - map, - i, - listener, - rx, - txs, - nr_workers, - counter, - worker_thread, - ); - }); + let handle = thread.spawn(Box::new(move || { + server_worker_regular(map, i, listener, rx, txs, nr_workers, counter); + })); handles.push(handle); } server_mainloop(stop_rx, grace_tx, senders, handles, thread); } -/// The real server function for [`AsyncKVMap`]. -/// -/// **You may not need to check this if it is OK to run benchmarks with [`std::thread`].** -pub fn server_async( +pub(crate) fn server_async( map: Arc>, host: &str, port: &str, nr_workers: usize, stop_rx: Receiver<()>, grace_tx: Sender<()>, - thread: impl Thread, ) { + let thread = map.thread(); + let (listener, senders, mut receivers, counter) = server_common(host, port, nr_workers); let mut handles = Vec::new(); @@ -578,19 +564,9 @@ pub fn server_async( let rx = receivers.pop().unwrap(); // guaranteed to succeed let nr_workers = nr_workers.clone(); let counter = counter.clone(); - let worker_thread = thread.clone(); - let handle = thread.spawn(move || { - server_worker_async( - map, - i, - listener, - rx, - txs, - nr_workers, - counter, - worker_thread, - ); - }); + let handle = thread.spawn(Box::new(move || { + server_worker_async(map, i, listener, rx, txs, nr_workers, counter); + })); handles.push(handle); } @@ -718,7 +694,7 @@ struct ServerMapOpt { map: BenchKVMapOpt, } -pub fn init(text: &str) -> BenchKVMap { +pub(crate) fn init(text: &str) -> BenchKVMap { let opt: ServerMapOpt = Figment::new() .merge(Toml::string(&text)) .merge(Env::raw()) @@ -754,14 +730,9 @@ mod tests { let (host, port) = (host.to_string(), port.to_string()); let (stop_tx, stop_rx) = channel(); let (grace_tx, grace_rx) = channel(); - let _ = std::thread::spawn(move || match map { - BenchKVMap::Regular(map) => { - map.server(&host, &port, nr_workers, stop_rx, grace_tx); - } - BenchKVMap::Async(map) => { - map.server(&host, &port, nr_workers, stop_rx, grace_tx); - } - }); + let _ = std::thread::spawn(Box::new(move || { + map.server(&host, &port, nr_workers, stop_rx, grace_tx); + })); std::thread::sleep(Duration::from_millis(1000)); (stop_tx, grace_rx) } @@ -789,68 +760,68 @@ mod tests { #[test] fn simple_mutex_hashmap() { let opt = hashmap::MutexHashMapOpt { shards: 512 }; - let map = BenchKVMap::Regular(Box::new(hashmap::MutexHashMap::new(&opt))); + let map = BenchKVMap::Regular(Arc::new(Box::new(hashmap::MutexHashMap::new(&opt)))); simple(map); } #[test] fn simple_rwlock_hashmap() { let opt = hashmap::RwLockHashMapOpt { shards: 512 }; - let map = BenchKVMap::Regular(Box::new(hashmap::RwLockHashMap::new(&opt))); + let map = BenchKVMap::Regular(Arc::new(Box::new(hashmap::RwLockHashMap::new(&opt)))); simple(map); } #[test] #[cfg(feature = "dashmap")] fn simple_dashmap() { - let map = BenchKVMap::Regular(Box::new(dashmap::DashMap::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(dashmap::DashMap::new()))); simple(map); } #[test] #[cfg(feature = "contrie")] fn simple_contrie() { - let map = BenchKVMap::Regular(Box::new(contrie::Contrie::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(contrie::Contrie::new()))); simple(map); } #[test] #[cfg(feature = "chashmap")] fn simple_chashmap() { - let map = BenchKVMap::Regular(Box::new(chashmap::CHashMap::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(chashmap::CHashMap::new()))); simple(map); } #[test] #[cfg(feature = "scc")] fn simple_scchashmap() { - let map = BenchKVMap::Regular(Box::new(scc::SccHashMap::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(scc::SccHashMap::new()))); simple(map); } #[test] #[cfg(feature = "flurry")] fn simple_flurry() { - let map = BenchKVMap::Regular(Box::new(flurry::Flurry::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(flurry::Flurry::new()))); simple(map); } #[test] #[cfg(feature = "papaya")] fn simple_papaya() { - let map = BenchKVMap::Regular(Box::new(papaya::Papaya::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(papaya::Papaya::new()))); simple(map); } #[test] fn simple_mutex_btreemap() { - let map = BenchKVMap::Regular(Box::new(btreemap::MutexBTreeMap::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(btreemap::MutexBTreeMap::new()))); simple(map); } #[test] fn simple_rwlock_btreemap() { - let map = BenchKVMap::Regular(Box::new(btreemap::RwLockBTreeMap::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(btreemap::RwLockBTreeMap::new()))); simple(map); } @@ -861,7 +832,7 @@ mod tests { let opt = rocksdb::RocksDBOpt { path: tmp_dir.path().to_str().unwrap().to_string(), }; - let map = BenchKVMap::Regular(Box::new(rocksdb::RocksDB::new(&opt))); + let map = BenchKVMap::Regular(Arc::new(Box::new(rocksdb::RocksDB::new(&opt)))); simple(map); } @@ -954,68 +925,68 @@ mod tests { #[test] fn batch_mutex_hashmap() { let opt = hashmap::MutexHashMapOpt { shards: 512 }; - let map = BenchKVMap::Regular(Box::new(hashmap::MutexHashMap::new(&opt))); + let map = BenchKVMap::Regular(Arc::new(Box::new(hashmap::MutexHashMap::new(&opt)))); batch(map); } #[test] fn batch_rwlock_hashmap() { let opt = hashmap::RwLockHashMapOpt { shards: 512 }; - let map = BenchKVMap::Regular(Box::new(hashmap::RwLockHashMap::new(&opt))); + let map = BenchKVMap::Regular(Arc::new(Box::new(hashmap::RwLockHashMap::new(&opt)))); batch(map); } #[test] #[cfg(feature = "dashmap")] fn batch_dashmap() { - let map = BenchKVMap::Regular(Box::new(dashmap::DashMap::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(dashmap::DashMap::new()))); batch(map); } #[test] #[cfg(feature = "contrie")] fn batch_contrie() { - let map = BenchKVMap::Regular(Box::new(contrie::Contrie::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(contrie::Contrie::new()))); batch(map); } #[test] #[cfg(feature = "chashmap")] fn batch_chashmap() { - let map = BenchKVMap::Regular(Box::new(chashmap::CHashMap::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(chashmap::CHashMap::new()))); batch(map); } #[test] #[cfg(feature = "scc")] fn batch_scchashmap() { - let map = BenchKVMap::Regular(Box::new(scc::SccHashMap::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(scc::SccHashMap::new()))); batch(map); } #[test] #[cfg(feature = "flurry")] fn batch_flurry() { - let map = BenchKVMap::Regular(Box::new(flurry::Flurry::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(flurry::Flurry::new()))); batch(map); } #[test] #[cfg(feature = "papaya")] fn batch_papaya() { - let map = BenchKVMap::Regular(Box::new(papaya::Papaya::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(papaya::Papaya::new()))); batch(map); } #[test] fn batch_mutex_btreemap() { - let map = BenchKVMap::Regular(Box::new(btreemap::MutexBTreeMap::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(btreemap::MutexBTreeMap::new()))); batch(map); } #[test] fn batch_rwlock_btreemap() { - let map = BenchKVMap::Regular(Box::new(btreemap::RwLockBTreeMap::new())); + let map = BenchKVMap::Regular(Arc::new(Box::new(btreemap::RwLockBTreeMap::new()))); batch(map); } @@ -1026,7 +997,7 @@ mod tests { let opt = rocksdb::RocksDBOpt { path: tmp_dir.path().to_str().unwrap().to_string(), }; - let map = BenchKVMap::Regular(Box::new(rocksdb::RocksDB::new(&opt))); + let map = BenchKVMap::Regular(Arc::new(Box::new(rocksdb::RocksDB::new(&opt)))); batch(map); } } diff --git a/src/stores.rs b/src/stores.rs index ae8e057..96da556 100644 --- a/src/stores.rs +++ b/src/stores.rs @@ -36,30 +36,51 @@ //! //! The source code of all built-in stores provide good examples on this process. -use crate::bench::Benchmark; +use crate::bench::{bench_async, bench_regular, Benchmark}; +use crate::server::{server_async, server_regular}; +use crate::thread::Thread; use crate::*; use hashbrown::HashMap; use log::debug; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::Arc; use toml::Table; /// A unified enum for a created key-value store that is ready to run. pub enum BenchKVMap { - Regular(Box), - Async(Box), + Regular(Arc>), + Async(Arc>), } impl BenchKVMap { - /// Wraps the real `bench` function of the store. - pub fn bench(self, phases: &Vec>) { + pub(crate) fn bench(&self, phases: &Vec>) { match self { BenchKVMap::Regular(map) => { - KVMap::bench(map, phases); + bench_regular(map.clone(), phases); } BenchKVMap::Async(map) => { - AsyncKVMap::bench(map, phases); + bench_async(map.clone(), phases); } }; } + + pub(crate) fn server( + &self, + host: &str, + port: &str, + nr_workers: usize, + stop_rx: Receiver<()>, + grace_tx: Sender<()>, + ) { + match self { + BenchKVMap::Regular(map) => { + server_regular(map.clone(), host, port, nr_workers, stop_rx, grace_tx); + } + BenchKVMap::Async(map) => { + server_async(map.clone(), host, port, nr_workers, stop_rx, grace_tx); + } + } + } } /// The centralized registry that maps the name of newly added key-value store to its constructor diff --git a/src/stores/btreemap.rs b/src/stores/btreemap.rs index 828c5e1..e8481db 100644 --- a/src/stores/btreemap.rs +++ b/src/stores/btreemap.rs @@ -35,7 +35,7 @@ impl MutexBTreeMap { } pub fn new_benchkvmap(_opt: &toml::Table) -> BenchKVMap { - BenchKVMap::Regular(Box::new(Self::new())) + BenchKVMap::Regular(Arc::new(Box::new(Self::new()))) } } @@ -82,7 +82,7 @@ impl RwLockBTreeMap { } pub fn new_benchkvmap(_opt: &toml::Table) -> BenchKVMap { - BenchKVMap::Regular(Box::new(Self::new())) + BenchKVMap::Regular(Arc::new(Box::new(Self::new()))) } } diff --git a/src/stores/chashmap.rs b/src/stores/chashmap.rs index fbebac5..7c6613a 100644 --- a/src/stores/chashmap.rs +++ b/src/stores/chashmap.rs @@ -20,7 +20,7 @@ impl CHashMap { } pub fn new_benchkvmap(_opt: &toml::Table) -> BenchKVMap { - BenchKVMap::Regular(Box::new(Self::new())) + BenchKVMap::Regular(Arc::new(Box::new(Self::new()))) } } diff --git a/src/stores/contrie.rs b/src/stores/contrie.rs index c783e1b..72f94d4 100644 --- a/src/stores/contrie.rs +++ b/src/stores/contrie.rs @@ -21,7 +21,7 @@ impl Contrie { } pub fn new_benchkvmap(_opt: &toml::Table) -> BenchKVMap { - BenchKVMap::Regular(Box::new(Self::new())) + BenchKVMap::Regular(Arc::new(Box::new(Self::new()))) } } diff --git a/src/stores/dashmap.rs b/src/stores/dashmap.rs index a05cddc..46f6b14 100644 --- a/src/stores/dashmap.rs +++ b/src/stores/dashmap.rs @@ -21,7 +21,7 @@ impl DashMap { } pub fn new_benchkvmap(_opt: &toml::Table) -> BenchKVMap { - BenchKVMap::Regular(Box::new(Self::new())) + BenchKVMap::Regular(Arc::new(Box::new(Self::new()))) } } diff --git a/src/stores/flurry.rs b/src/stores/flurry.rs index 6a591ba..59fd396 100644 --- a/src/stores/flurry.rs +++ b/src/stores/flurry.rs @@ -21,7 +21,7 @@ impl Flurry { } pub fn new_benchkvmap(_opt: &toml::Table) -> BenchKVMap { - BenchKVMap::Regular(Box::new(Self::new())) + BenchKVMap::Regular(Arc::new(Box::new(Self::new()))) } } diff --git a/src/stores/hashmap.rs b/src/stores/hashmap.rs index 9f52e70..a93053f 100644 --- a/src/stores/hashmap.rs +++ b/src/stores/hashmap.rs @@ -71,7 +71,7 @@ impl MutexHashMap { pub fn new_benchkvmap(opt: &toml::Table) -> BenchKVMap { let opt: MutexHashMapOpt = opt.clone().try_into().unwrap(); - BenchKVMap::Regular(Box::new(Self::new(&opt))) + BenchKVMap::Regular(Arc::new(Box::new(Self::new(&opt)))) } } @@ -137,7 +137,7 @@ impl RwLockHashMap { pub fn new_benchkvmap(opt: &toml::Table) -> BenchKVMap { let opt: RwLockHashMapOpt = opt.clone().try_into().unwrap(); - BenchKVMap::Regular(Box::new(Self::new(&opt))) + BenchKVMap::Regular(Arc::new(Box::new(Self::new(&opt)))) } } diff --git a/src/stores/null.rs b/src/stores/null.rs index 95dba3e..3923080 100644 --- a/src/stores/null.rs +++ b/src/stores/null.rs @@ -32,11 +32,11 @@ impl NullMap { } pub fn new_benchkvmap(_opt: &toml::Table) -> BenchKVMap { - BenchKVMap::Regular(Box::new(Self::new())) + BenchKVMap::Regular(Arc::new(Box::new(Self::new()))) } pub fn new_benchkvmap_async(_opt: &toml::Table) -> BenchKVMap { - BenchKVMap::Async(Box::new(Self::new())) + BenchKVMap::Async(Arc::new(Box::new(Self::new()))) } } diff --git a/src/stores/papaya.rs b/src/stores/papaya.rs index f99d227..370d509 100644 --- a/src/stores/papaya.rs +++ b/src/stores/papaya.rs @@ -21,7 +21,7 @@ impl Papaya { } pub fn new_benchkvmap(_opt: &toml::Table) -> BenchKVMap { - BenchKVMap::Regular(Box::new(Self::new())) + BenchKVMap::Regular(Arc::new(Box::new(Self::new()))) } } diff --git a/src/stores/remote.rs b/src/stores/remote.rs index 15b24ac..6714e7e 100644 --- a/src/stores/remote.rs +++ b/src/stores/remote.rs @@ -43,7 +43,7 @@ impl RemoteMap { pub fn new_benchkvmap(opt: &toml::Table) -> BenchKVMap { let opt: RemoteMapOpt = opt.clone().try_into().unwrap(); - BenchKVMap::Async(Box::new(Self::new(&opt))) + BenchKVMap::Async(Arc::new(Box::new(Self::new(&opt)))) } } diff --git a/src/stores/rocksdb.rs b/src/stores/rocksdb.rs index 531038b..f63dd23 100644 --- a/src/stores/rocksdb.rs +++ b/src/stores/rocksdb.rs @@ -33,7 +33,7 @@ impl RocksDB { pub fn new_benchkvmap(opt: &toml::Table) -> BenchKVMap { let opt: RocksDBOpt = opt.clone().try_into().unwrap(); - BenchKVMap::Regular(Box::new(Self::new(&opt))) + BenchKVMap::Regular(Arc::new(Box::new(Self::new(&opt)))) } } diff --git a/src/stores/scc.rs b/src/stores/scc.rs index 484bebe..fb9246c 100644 --- a/src/stores/scc.rs +++ b/src/stores/scc.rs @@ -22,7 +22,7 @@ impl SccHashMap { } pub fn new_benchkvmap(_opt: &toml::Table) -> BenchKVMap { - BenchKVMap::Regular(Box::new(Self::new())) + BenchKVMap::Regular(Arc::new(Box::new(Self::new()))) } } diff --git a/src/thread.rs b/src/thread.rs index 4f398be..92fa562 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -12,13 +12,11 @@ //! benchmark code, which does not use any return values. pub trait JoinHandle { - fn join(self); + fn join(self: Box); } -pub trait Thread: Send + Clone + 'static { - type JoinHandle: JoinHandle; - - fn spawn(&self, f: impl FnOnce() + Send + 'static) -> Self::JoinHandle; +pub trait Thread { + fn spawn(&self, f: Box) -> Box; fn yield_now(&self); @@ -28,17 +26,19 @@ pub trait Thread: Send + Clone + 'static { #[derive(Clone)] pub(crate) struct DefaultThread; -impl JoinHandle for std::thread::JoinHandle<()> { - fn join(self) { - let _ = self.join(); +pub(crate) struct DefaultJoinHandle(std::thread::JoinHandle<()>); + +impl JoinHandle for DefaultJoinHandle { + fn join(self: Box) { + let handle = self.0; + assert!(handle.join().is_ok()); } } impl Thread for DefaultThread { - type JoinHandle = std::thread::JoinHandle<()>; - - fn spawn(&self, f: impl FnOnce() + Send + 'static) -> Self::JoinHandle { - std::thread::spawn(f) + fn spawn(&self, f: Box) -> Box { + let handle = std::thread::spawn(f); + Box::new(DefaultJoinHandle(handle)) } fn yield_now(&self) {