You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have a complex use case for crossbeam_utils::sync::WaitGroup. I have a thread-safe structure that builds SSTables for databases, and it is very large in-memory, so it can't easily be cloned. As part of that, the table builder needs to be able to compress and encrypt the blocks when they're complete, but that work is done asynchronously. I want to use WaitGroup to provide the final sync point before I flush the SSTable to disk.
use bytes::Bytes;use crossbeam_deque::Worker;use crossbeam_utils::sync::WaitGroup;use parking_lot::Mutex;use std::{sync::Arc, thread, thread::available_parallelism};/// Block on diskstructBlock{data:Bytes,// several megabytes}/// Thread-safe SSTable BuilderstructBuilder{wg:Arc<WaitGroup>,done:Arc<Mutex<bool>>,blocks:Arc<Mutex<Vec<Block>>>,// could be 100s of MiBs until it's flushedwork_queue:Arc<Mutex<Worker<usize>>>,}implBuilder{/// Returns an Arc<Builder> to ensure that cloning doesn't clone hundreds/// of megabytespubfnnew() -> Arc<Self>{let f = Arc::new(Builder{wg:Arc::new(WaitGroup::new()),done:Arc::new(Mutex::new(false)),blocks:Arc::new(Mutex::new(vec![])),work_queue:Arc::new(Mutex::new(Worker::<usize>::new_lifo())),});// spin up the internal data workerslet p_count = available_parallelism().unwrap().get();for _ in0..=p_count {let f_alias = f.clone();
thread::spawn(move || {
f_alias.worker();});}
f
}pubfnadd(&self,_key:Bytes,_value:Bytes){// add to block}pubfncomplete(&self){// inform workers there's no more work{letmut done = self.done.lock();*done = true;}// ensure the work is completeself.wg.wait();// flush blocks to disk}// a worker threadfnworker(&self){let wg = self.wg.clone();while !*self.done.lock(){let stealer = self.work_queue.lock().stealer();let idx = stealer.steal().success().unwrap();letmut block_list_ref = self.blocks.lock();let _block = &mut block_list_ref[idx];// compress and encrypt the block stolen from the queue// this modifies the block vec in-place}drop(wg);}}fnmain(){// we need a new sstablelet f = Builder::new();// some other threads will call this// f.add(key, value);// sstable is complete// finalize all blocks// write to disk
f.complete();}
Because of the size of the data in the blocks, and the overall builder size, moving the data around isn't very feasible, so I mutate and operate on it in-place as much as possible. I pass pointers around, and everything is behind Arc, and most are also locked with mutexes. However, when I tried to introduce WaitGroup, I got this error:
Compiling playground v0.0.1 (/playground)
error[E0507]: cannot move out of an `Arc`
--> src/lib.rs:53:9
|
53 | self.wg.wait();
| ^^^^^^^ ------ value moved due to this method call
| |
| move occurs because value has type `WaitGroup`, which does not implement the `Copy` trait
|
note: `WaitGroup::wait` takes ownership of the receiver `self`, which moves value
--> /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-utils-0.8.16/src/sync/wait_group.rs:103:17
|
103 | pub fn wait(self) {
| ^^^^
help: you can `clone` the value and consume it, but this might not be your desired behavior
|
53 | self.wg.clone().wait();
| ++++++++
For more information about this error, try `rustc --explain E0507`.
error: could not compile `playground` (lib) due to previous error
I've tried several different attemptes with pointers, smart pointers, and mutexes, but I cannot seem to get WaitGroup to meet my needs. It would be great if WaitGroup could support this kind of use case.
The text was updated successfully, but these errors were encountered:
I guess you want a variant of barrier that does not need to know the number of threads at construction, right?
And is also potentially scalable, as well. WaitGroup allows users to add or remove threads as needed, which makes it useful as a "dumb barrier" of sorts. I can likely use a barrier as a drop-in replacement for now (haven't tried yet), but I wanted to surface this type of use case.
I have a complex use case for
crossbeam_utils::sync::WaitGroup
. I have a thread-safe structure that builds SSTables for databases, and it is very large in-memory, so it can't easily be cloned. As part of that, the table builder needs to be able to compress and encrypt the blocks when they're complete, but that work is done asynchronously. I want to useWaitGroup
to provide the final sync point before I flush the SSTable to disk.Here's my current MVCE (playground link).
Because of the size of the data in the blocks, and the overall builder size, moving the data around isn't very feasible, so I mutate and operate on it in-place as much as possible. I pass pointers around, and everything is behind
Arc
, and most are also locked with mutexes. However, when I tried to introduceWaitGroup
, I got this error:I've tried several different attemptes with pointers, smart pointers, and mutexes, but I cannot seem to get
WaitGroup
to meet my needs. It would be great ifWaitGroup
could support this kind of use case.The text was updated successfully, but these errors were encountered: