-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add named pools #45
Add named pools #45
Changes from 3 commits
03d8d1a
384c181
b269930
cd062f9
340773a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,15 @@ let do_task f p = | |
| TasksActive -> raise e | ||
| _ -> () | ||
|
||
let setup_pool ~num_additional_domains = | ||
let named_pools = Hashtbl.create 8 | ||
|
||
let named_pool_mutex = Mutex.create () | ||
|
||
let setup_pool ?name ~num_additional_domains () = | ||
if num_additional_domains < 0 then | ||
raise (Invalid_argument | ||
"Task.setup_pool: num_additional_domains must be at least 0") | ||
else | ||
let task_chan = Multi_channel.make (num_additional_domains+1) in | ||
let rec worker () = | ||
match Multi_channel.recv task_chan with | ||
|
@@ -32,7 +40,15 @@ let setup_pool ~num_additional_domains = | |
worker () | ||
in | ||
let domains = Array.init num_additional_domains (fun _ -> Domain.spawn worker) in | ||
{domains; task_chan} | ||
let p = {domains; task_chan} in | ||
let _ = match name with | ||
| Some x -> | ||
Mutex.lock named_pool_mutex; | ||
Hashtbl.add named_pools x p; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Meta-comment: I started thinking about the possibility of exceptions from It would be useful to have a let with_mutex m f v =
Mutex.lock m;
begin try f v with e ->
Mutex.unlock m;
raise e
end;
Mutex.unlock m which will guard against any exceptional behaviour. CC @ctk21 @sadiqj. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that seems not unreasonable to me |
||
Mutex.unlock named_pool_mutex | ||
| None -> () | ||
in | ||
p | ||
|
||
let async pool task = | ||
let p = Atomic.make None in | ||
|
@@ -61,6 +77,14 @@ let teardown_pool pool = | |
Multi_channel.clear_local_state (); | ||
Array.iter Domain.join pool.domains | ||
|
||
let lookup_pool name = | ||
Mutex.lock named_pool_mutex; | ||
let p = Hashtbl.find_opt named_pools name in | ||
Mutex.unlock named_pool_mutex; | ||
p | ||
|
||
let get_num_domains pool = (Array.length pool.domains + 1) | ||
|
||
let parallel_for_reduce ?(chunk_size=0) ~start ~finish ~body pool reduce_fun init = | ||
let chunk_size = if chunk_size > 0 then chunk_size | ||
else begin | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excuse my ignorance, but why do we need to add the unit
()
arg to the end of this one?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider the case when we don't include the unit value argument:
vs when we have it:
https://stackoverflow.com/a/1667891 has more information on this issue.