Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add named pools #45

Merged
merged 5 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 63 additions & 14 deletions lib/task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ type task_msg =
Task : 'a task * 'a promise -> task_msg
| Quit : task_msg

type pool =
{domains : unit Domain.t array;
task_chan : task_msg Multi_channel.t}
type pool_data = {
domains : unit Domain.t array;
task_chan : task_msg Multi_channel.t;
name: string option
}

type pool = pool_data option Atomic.t

let do_task f p =
try
Expand All @@ -22,7 +26,15 @@ let do_task f p =
| TasksActive -> raise e
| _ -> ()

let setup_pool ~num_additional_domains =
let named_pools = Hashtbl.create 8

let named_pools_mutex = Mutex.create ()

let setup_pool ?name ~num_additional_domains () =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excuse my ignorance, but why do we need to add the unit () arg to the end of this one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the case when we don't include the unit value argument:

# let setup_pool ?name ~num_additional_domains = ();;
Warning 16 [unerasable-optional-argument]: this optional argument cannot be erased.
val setup_pool : ?name:'a -> num_additional_domains:'b -> unit = <fun>
# setup_pool ~num_additional_domains:5;;
- : ?name:'a -> unit = <fun>

vs when we have it:

# let setup_pool ?name ~num_additional_domains () = ();;
val setup_pool : ?name:'a -> num_additional_domains:'b -> unit -> unit =
  <fun>
# setup_pool ~num_additional_domains:5 ();;
- : unit = ()

https://stackoverflow.com/a/1667891 has more information on this issue.

if num_additional_domains < 0 then
raise (Invalid_argument
"Task.setup_pool: num_additional_domains must be at least 0")
else
let task_chan = Multi_channel.make (num_additional_domains+1) in
let rec worker () =
match Multi_channel.recv task_chan with
Expand All @@ -32,19 +44,34 @@ let setup_pool ~num_additional_domains =
worker ()
in
let domains = Array.init num_additional_domains (fun _ -> Domain.spawn worker) in
{domains; task_chan}
let p = Atomic.make (Some {domains; task_chan; name}) in
begin match name with
| None -> ()
| Some x ->
Mutex.lock named_pools_mutex;
Hashtbl.add named_pools x p;
Mutex.unlock named_pools_mutex
end;
p

let get_pool_data p =
match Atomic.get p with
| None -> raise (Invalid_argument "pool already torn down")
| Some p -> p

let async pool task =
let pd = get_pool_data pool in
let p = Atomic.make None in
Multi_channel.send pool.task_chan (Task(task,p));
Multi_channel.send pd.task_chan (Task(task,p));
p

let rec await pool promise =
let pd = get_pool_data pool in
match Atomic.get promise with
| None ->
begin
try
match Multi_channel.recv_poll pool.task_chan with
match Multi_channel.recv_poll pd.task_chan with
| Task (t, p) -> do_task t p
| Quit -> raise TasksActive
with
Expand All @@ -55,16 +82,37 @@ let rec await pool promise =
| Some (Error e) -> raise e

let teardown_pool pool =
for _i=1 to Array.length pool.domains do
Multi_channel.send pool.task_chan Quit
let pd = get_pool_data pool in
for _i=1 to Array.length pd.domains do
Multi_channel.send pd.task_chan Quit
done;
Multi_channel.clear_local_state ();
Array.iter Domain.join pool.domains
Array.iter Domain.join pd.domains;
(* Remove the pool from the table *)
begin match pd.name with
| None -> ()
| Some n ->
Mutex.lock named_pools_mutex;
Hashtbl.remove named_pools n;
Mutex.unlock named_pools_mutex
end;
Atomic.set pool None

let lookup_pool name =
Mutex.lock named_pools_mutex;
let p = Hashtbl.find_opt named_pools name in
Mutex.unlock named_pools_mutex;
p

let get_num_domains pool =
let pd = get_pool_data pool in
Array.length pd.domains + 1

let parallel_for_reduce ?(chunk_size=0) ~start ~finish ~body pool reduce_fun init =
let pd = get_pool_data pool in
let chunk_size = if chunk_size > 0 then chunk_size
else begin
let n_domains = (Array.length pool.domains) + 1 in
let n_domains = (Array.length pd.domains) + 1 in
let n_tasks = finish - start + 1 in
if n_domains = 1 then n_tasks
else max 1 (n_tasks/(8*n_domains))
Expand All @@ -88,9 +136,10 @@ let parallel_for_reduce ?(chunk_size=0) ~start ~finish ~body pool reduce_fun ini
reduce_fun init (work start finish)

let parallel_for ?(chunk_size=0) ~start ~finish ~body pool =
let pd = get_pool_data pool in
let chunk_size = if chunk_size > 0 then chunk_size
else begin
let n_domains = (Array.length pool.domains) + 1 in
let n_domains = (Array.length pd.domains) + 1 in
let n_tasks = finish - start + 1 in
if n_domains = 1 then n_tasks
else max 1 (n_tasks/(8*n_domains))
Expand All @@ -109,7 +158,7 @@ let parallel_for ?(chunk_size=0) ~start ~finish ~body pool =
work pool body start finish

let parallel_scan pool op elements =

let pd = get_pool_data pool in
let scan_part op elements prefix_sum start finish =
assert (Array.length elements > (finish - start));
for i = (start + 1) to finish do
Expand All @@ -123,7 +172,7 @@ let parallel_scan pool op elements =
done
in
let n = Array.length elements in
let p = (Array.length pool.domains) + 1 in
let p = (Array.length pd.domains) + 1 in
let prefix_s = Array.copy elements in

parallel_for pool ~chunk_size:1 ~start:0 ~finish:(p - 1)
Expand Down
14 changes: 12 additions & 2 deletions lib/task.mli
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,26 @@ type 'a promise
type pool
(** Type of task pool *)

val setup_pool : num_additional_domains:int -> pool
val setup_pool : ?name:string -> num_additional_domains:int -> unit -> pool
(** Sets up a task execution pool with [num_additional_domains + 1] domains
* including the current domain *)
* including the current domain. If [name] is provided, the pool is mapped to
* [name] which can be looked up later with [lookup_pool name].
* Raises [Invalid_argumet] when [num_additional_domains] is less than 0. *)

exception TasksActive

val teardown_pool : pool -> unit
(** Tears down the task execution pool.
* Raises [TasksActive] exception if any tasks are currently active. *)

val lookup_pool : string -> pool option
(** [lookup_pool name] returns [Some pool] if [pool] is associated to [name] or
* returns [None] if no value is associated to it. *)

val get_num_domains : pool -> int
(** [get_num_domains pool] returns the total number of domains in [pool]
* including the parent domain. *)

val async : pool -> 'a task -> 'a promise
(** [async p t] runs the task [t] asynchronously in the pool [p]. The function
* returns a promise [r] in which the result of the task [t] will be stored.
Expand Down
2 changes: 1 addition & 1 deletion test/LU_decomposition_multicore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ let lup pool (a0 : float array) =
a

let () =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let a = parallel_create pool
(fun _ _ -> (Random.State.float (Domain.DLS.get k) 100.0) +. 1.0 ) in
let lu = lup pool a in
Expand Down
2 changes: 1 addition & 1 deletion test/enumerate_par.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ let n = try int_of_string Sys.argv.(2) with _ -> 100
module T = Domainslib.Task

let _ =
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
T.parallel_for p ~start:0 ~finish:(n-1) ~chunk_size:16 ~body:(fun i ->
print_string @@ Printf.sprintf "[%d] %d\n%!" (Domain.self () :> int) i);
T.teardown_pool p
2 changes: 1 addition & 1 deletion test/fib_par.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ let rec fib_par pool n =
T.await pool a + T.await pool b

let main =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let res = fib_par pool n in
T.teardown_pool pool;
Printf.printf "fib(%d) = %d\n" n res
2 changes: 1 addition & 1 deletion test/game_of_life_multicore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ let rec repeat pool n =
| _-> next pool; repeat pool (n-1)

let ()=
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
print !rg;
repeat pool n_times;
print !rg;
Expand Down
2 changes: 1 addition & 1 deletion test/prefix_sum.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ let gen n = Array.make n 1 (*(fun _ -> Random.int n)*)
let prefix_sum pool = T.parallel_scan pool (+)

let _ =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let arr = gen n in
let t = Unix.gettimeofday() in
let _ = prefix_sum pool arr in
Expand Down
2 changes: 1 addition & 1 deletion test/spectralnorm2_multicore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ let eval_AtA_times_u pool u v =
eval_A_times_u pool u w; eval_At_times_u pool w v

let () =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let u = Array.make n 1.0 and v = Array.make n 0.0 in
for _i = 0 to 9 do
eval_AtA_times_u pool u v; eval_AtA_times_u pool v u
Expand Down
6 changes: 3 additions & 3 deletions test/sum_par.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module T = Domainslib.Task

let _ =
(* use parallel_for_reduce *)
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let sum =
T.parallel_for_reduce p (+) 0 ~chunk_size:(n/(4*num_domains)) ~start:0
~finish:(n-1) ~body:(fun _i -> 1)
Expand All @@ -16,7 +16,7 @@ let _ =

let _ =
(* explictly use empty pool and default chunk_size *)
let p = T.setup_pool ~num_additional_domains:0 in
let p = T.setup_pool ~num_additional_domains:0 () in
let sum = Atomic.make 0 in
T.parallel_for p ~start:0 ~finish:(n-1)
~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1));
Expand All @@ -27,7 +27,7 @@ let _ =

let _ =
(* configured num_domains and default chunk_size *)
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let sum = Atomic.make 0 in
T.parallel_for p ~start:0 ~finish:(n-1)
~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1));
Expand Down
2 changes: 1 addition & 1 deletion test/summed_area_table.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ let calc_table pool mat =
let _ =
let m = Array.make_matrix size size 1 (*Array.init size (fun _ -> Array.init size (fun _ -> Random.int size))*)
in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let _ = calc_table pool m in

(* for i = 0 to size-1 do
Expand Down
2 changes: 1 addition & 1 deletion test/task_exn.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module T = Domainslib.Task
exception E

let _ =
let pool = T.setup_pool ~num_additional_domains:3 in
let pool = T.setup_pool ~num_additional_domains:3 () in

let p1 = T.async pool (fun () ->
let p2 = T.async pool (fun () -> raise E) in
Expand Down
2 changes: 1 addition & 1 deletion test/task_throughput.ml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ end
let _ =
Printf.printf "n_iterations: %d n_units: %d n_domains: %d\n"
n_iterations n_tasks n_domains;
let pool = T.setup_pool ~num_additional_domains:(n_domains - 1) in
let pool = T.setup_pool ~num_additional_domains:(n_domains - 1) () in

let hist = TimingHist.make 5 25 in
for _ = 1 to n_iterations do
Expand Down
49 changes: 32 additions & 17 deletions test/test_task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,35 @@ let prefix_sum pool = fun () ->


let () =
let pool = Task.setup_pool ~num_additional_domains:3 in
modify_arr pool 0 ();
modify_arr pool 25 ();
modify_arr pool 100 ();
inc_ctr pool 0 ();
inc_ctr pool 16 ();
inc_ctr pool 32 ();
inc_ctr pool 1000 ();
sum_sequence pool 0 0 ();
sum_sequence pool 10 10 ();
sum_sequence pool 1 0 ();
sum_sequence pool 1 10 ();
sum_sequence pool 100 10 ();
sum_sequence pool 100 100 ();
prefix_sum pool ();
Task.teardown_pool pool;
print_endline "ok"
let pool1 = Task.setup_pool ~num_additional_domains:2 ~name:"pool1" () in
let pool2 = Task.setup_pool ~num_additional_domains:2 ~name:"pool2" () in
let p1 = Option.get @@ Task.lookup_pool "pool1" in
modify_arr pool1 0 ();
modify_arr pool1 25 ();
modify_arr pool1 100 ();
inc_ctr p1 0 ();
inc_ctr p1 16 ();
inc_ctr p1 32 ();
inc_ctr p1 1000 ();
let p2 = Option.get @@ Task.lookup_pool "pool2" in
sum_sequence pool2 0 0 ();
sum_sequence pool2 10 10 ();
sum_sequence pool2 1 0 ();
sum_sequence p2 1 10 ();
sum_sequence p2 100 10 ();
sum_sequence p2 100 100 ();
prefix_sum p2 ();
Task.teardown_pool pool1;
Task.teardown_pool pool2;

try
sum_sequence pool2 0 0 ();
assert false
with Invalid_argument _ -> ();

assert (Task.lookup_pool "pool1" = None);

try
let _ = Task.setup_pool ~num_additional_domains:(-1) () in ()
with Invalid_argument _ -> ();
print_endline "ok"