Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add named pools #45

Merged
merged 5 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions lib/task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ let do_task f p =
| TasksActive -> raise e
| _ -> ()

let setup_pool ~num_additional_domains =
let named_pools = Hashtbl.create 8

let named_pool_mutex = Mutex.create ()

let setup_pool ?name ~num_additional_domains () =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excuse my ignorance, but why do we need to add the unit () arg to the end of this one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the case when we don't include the unit value argument:

# let setup_pool ?name ~num_additional_domains = ();;
Warning 16 [unerasable-optional-argument]: this optional argument cannot be erased.
val setup_pool : ?name:'a -> num_additional_domains:'b -> unit = <fun>
# setup_pool ~num_additional_domains:5;;
- : ?name:'a -> unit = <fun>

vs when we have it:

# let setup_pool ?name ~num_additional_domains () = ();;
val setup_pool : ?name:'a -> num_additional_domains:'b -> unit -> unit =
  <fun>
# setup_pool ~num_additional_domains:5 ();;
- : unit = ()

https://stackoverflow.com/a/1667891 has more information on this issue.

if num_additional_domains < 0 then
raise (Invalid_argument
"Task.setup_pool: num_additional_domains must be at least 0")
else
let task_chan = Multi_channel.make (num_additional_domains+1) in
let rec worker () =
match Multi_channel.recv task_chan with
Expand All @@ -32,7 +40,15 @@ let setup_pool ~num_additional_domains =
worker ()
in
let domains = Array.init num_additional_domains (fun _ -> Domain.spawn worker) in
{domains; task_chan}
let p = {domains; task_chan} in
let _ = match name with
| Some x ->
Mutex.lock named_pool_mutex;
Hashtbl.add named_pools x p;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meta-comment: I started thinking about the possibility of exceptions from Hashtbl.add. While Hashtbl.add does not raise any synchronous exceptions, there is of course the possibility of asynchronous exceptions. For example, add performs an allocation, which triggers a GC, which runs a finaliser which raises an exception. Because of the exception, the mutex would not be unlocked.

It would be useful to have a with_mutex primitive in Mutex module:

let with_mutex m f v =
  Mutex.lock m;
  begin try f v with e -> 
    Mutex.unlock m;
    raise e
  end;
  Mutex.unlock m

which will guard against any exceptional behaviour. CC @ctk21 @sadiqj.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that seems not unreasonable to me

Mutex.unlock named_pool_mutex
| None -> ()
in
p

let async pool task =
let p = Atomic.make None in
Expand Down Expand Up @@ -61,6 +77,14 @@ let teardown_pool pool =
Multi_channel.clear_local_state ();
Array.iter Domain.join pool.domains

let lookup_pool name =
Mutex.lock named_pool_mutex;
let p = Hashtbl.find_opt named_pools name in
Mutex.unlock named_pool_mutex;
p

let get_num_domains pool = (Array.length pool.domains + 1)

let parallel_for_reduce ?(chunk_size=0) ~start ~finish ~body pool reduce_fun init =
let chunk_size = if chunk_size > 0 then chunk_size
else begin
Expand Down
14 changes: 12 additions & 2 deletions lib/task.mli
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,26 @@ type 'a promise
type pool
(** Type of task pool *)

val setup_pool : num_additional_domains:int -> pool
val setup_pool : ?name:string -> num_additional_domains:int -> unit -> pool
(** Sets up a task execution pool with [num_additional_domains + 1] domains
* including the current domain *)
* including the current domain. If [name] is provided, the pool is mapped to
* [name] which can be looked up later with [lookup_pool name].
* Raises [Invalid_argumet] when [num_additional_domains] is less than 0. *)

exception TasksActive

val teardown_pool : pool -> unit
(** Tears down the task execution pool.
* Raises [TasksActive] exception if any tasks are currently active. *)

val lookup_pool : string -> pool option
(** [lookup_pool name] returns [Some pool] if [pool] is associated to [name] or
* returns [None] if no value is associated to it. *)

val get_num_domains : pool -> int
(** [get_num_domains pool] returns the total number of domains in [pool]
* including the parent domain. *)

val async : pool -> 'a task -> 'a promise
(** [async p t] runs the task [t] asynchronously in the pool [p]. The function
* returns a promise [r] in which the result of the task [t] will be stored.
Expand Down
2 changes: 1 addition & 1 deletion test/LU_decomposition_multicore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ let lup pool (a0 : float array) =
a

let () =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let a = parallel_create pool
(fun _ _ -> (Random.State.float (Domain.DLS.get k) 100.0) +. 1.0 ) in
let lu = lup pool a in
Expand Down
2 changes: 1 addition & 1 deletion test/enumerate_par.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ let n = try int_of_string Sys.argv.(2) with _ -> 100
module T = Domainslib.Task

let _ =
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
T.parallel_for p ~start:0 ~finish:(n-1) ~chunk_size:16 ~body:(fun i ->
print_string @@ Printf.sprintf "[%d] %d\n%!" (Domain.self () :> int) i);
T.teardown_pool p
2 changes: 1 addition & 1 deletion test/fib_par.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ let rec fib_par pool n =
T.await pool a + T.await pool b

let main =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let res = fib_par pool n in
T.teardown_pool pool;
Printf.printf "fib(%d) = %d\n" n res
2 changes: 1 addition & 1 deletion test/game_of_life_multicore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ let rec repeat pool n =
| _-> next pool; repeat pool (n-1)

let ()=
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
print !rg;
repeat pool n_times;
print !rg;
Expand Down
2 changes: 1 addition & 1 deletion test/prefix_sum.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ let gen n = Array.make n 1 (*(fun _ -> Random.int n)*)
let prefix_sum pool = T.parallel_scan pool (+)

let _ =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let arr = gen n in
let t = Unix.gettimeofday() in
let _ = prefix_sum pool arr in
Expand Down
2 changes: 1 addition & 1 deletion test/spectralnorm2_multicore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ let eval_AtA_times_u pool u v =
eval_A_times_u pool u w; eval_At_times_u pool w v

let () =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let u = Array.make n 1.0 and v = Array.make n 0.0 in
for _i = 0 to 9 do
eval_AtA_times_u pool u v; eval_AtA_times_u pool v u
Expand Down
6 changes: 3 additions & 3 deletions test/sum_par.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module T = Domainslib.Task

let _ =
(* use parallel_for_reduce *)
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let sum =
T.parallel_for_reduce p (+) 0 ~chunk_size:(n/(4*num_domains)) ~start:0
~finish:(n-1) ~body:(fun _i -> 1)
Expand All @@ -16,7 +16,7 @@ let _ =

let _ =
(* explictly use empty pool and default chunk_size *)
let p = T.setup_pool ~num_additional_domains:0 in
let p = T.setup_pool ~num_additional_domains:0 () in
let sum = Atomic.make 0 in
T.parallel_for p ~start:0 ~finish:(n-1)
~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1));
Expand All @@ -27,7 +27,7 @@ let _ =

let _ =
(* configured num_domains and default chunk_size *)
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let sum = Atomic.make 0 in
T.parallel_for p ~start:0 ~finish:(n-1)
~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1));
Expand Down
2 changes: 1 addition & 1 deletion test/summed_area_table.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ let calc_table pool mat =
let _ =
let m = Array.make_matrix size size 1 (*Array.init size (fun _ -> Array.init size (fun _ -> Random.int size))*)
in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let _ = calc_table pool m in

(* for i = 0 to size-1 do
Expand Down
2 changes: 1 addition & 1 deletion test/task_exn.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module T = Domainslib.Task
exception E

let _ =
let pool = T.setup_pool ~num_additional_domains:3 in
let pool = T.setup_pool ~num_additional_domains:3 () in

let p1 = T.async pool (fun () ->
let p2 = T.async pool (fun () -> raise E) in
Expand Down
2 changes: 1 addition & 1 deletion test/task_throughput.ml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ end
let _ =
Printf.printf "n_iterations: %d n_units: %d n_domains: %d\n"
n_iterations n_tasks n_domains;
let pool = T.setup_pool ~num_additional_domains:(n_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(n_domains - 1) () in

let hist = TimingHist.make 5 25 in
for _ = 1 to n_iterations do
Expand Down
8 changes: 6 additions & 2 deletions test/test_task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,24 @@ let prefix_sum pool = fun () ->


let () =
let pool = Task.setup_pool ~num_additional_domains:3 in
let pool = Task.setup_pool ~num_additional_domains:3 ~name:"pool1" () in
Sudha247 marked this conversation as resolved.
Show resolved Hide resolved
modify_arr pool 0 ();
modify_arr pool 25 ();
modify_arr pool 100 ();
inc_ctr pool 0 ();
inc_ctr pool 16 ();
inc_ctr pool 32 ();
inc_ctr pool 1000 ();
sum_sequence pool 0 0 ();
let p2 = Option.get @@ Task.lookup_pool "pool1" in
sum_sequence p2 0 0 ();
sum_sequence pool 10 10 ();
sum_sequence pool 1 0 ();
sum_sequence pool 1 10 ();
sum_sequence pool 100 10 ();
sum_sequence pool 100 100 ();
prefix_sum pool ();
Task.teardown_pool pool;
try
let _ = Task.setup_pool ~num_additional_domains:(-1) () in ()
with Invalid_argument _ -> ();
print_endline "ok"