Skip to content

Commit

Permalink
Merge pull request #78 from favonia/fix-setup-pool
Browse files Browse the repository at this point in the history
Rename `num_additional_domains` to `num_domains` for `setup_pool`
  • Loading branch information
Sudha247 authored Jul 27, 2022
2 parents 15f04f3 + 1c78308 commit 94abdda
Show file tree
Hide file tree
Showing 17 changed files with 57 additions and 52 deletions.
10 changes: 5 additions & 5 deletions lib/task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ let run (type a) pool (f : unit -> a) : a =
let named_pools = Hashtbl.create 8
let named_pools_mutex = Mutex.create ()

let setup_pool ?name ~num_additional_domains () =
if num_additional_domains < 0 then
let setup_pool ?name ~num_domains () =
if num_domains < 0 then
raise (Invalid_argument
"Task.setup_pool: num_additional_domains must be at least 0")
"Task.setup_pool: num_domains must be at least 0")
else
let task_chan = Multi_channel.make (num_additional_domains+1) in
let domains = Array.init num_additional_domains (fun _ ->
let task_chan = Multi_channel.make (num_domains+1) in
let domains = Array.init num_domains (fun _ ->
Domain.spawn (fun _ -> worker task_chan))
in
let p = Atomic.make (Some {domains; task_chan; name}) in
Expand Down
59 changes: 32 additions & 27 deletions lib/task.mli
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ type !'a promise
type pool
(** Type of task pool *)

val setup_pool : ?name:string -> num_additional_domains:int -> unit -> pool
(** Sets up a task execution pool with [num_additional_domains + 1] domains
including the current domain. If [name] is provided, the pool is mapped to
[name] which can be looked up later with [lookup_pool name].
val setup_pool : ?name:string -> num_domains:int -> unit -> pool
(** Sets up a task execution pool with [num_domains] new domains. If [name] is
provided, the pool is mapped to [name] which can be looked up later with
[lookup_pool name].
Raises {!Invalid_argument} when [num_additional_domains] is less than 0. *)
Raises {!Invalid_argument} when [num_domains] is less than 0. *)

val teardown_pool : pool -> unit
(** Tears down the task execution pool. *)
Expand All @@ -26,9 +26,9 @@ val get_num_domains : pool -> int
including the parent domain. *)

val run : pool -> 'a task -> 'a
(** [run p t] runs the task [t] synchronously in the pool [p]. If the task [t]
blocks on a promise, then tasks from the pool [p] are executed until the
promise blocking [t] is resolved.
(** [run p t] runs the task [t] synchronously with the calling domain and the
domains in the pool [p]. If the task [t] blocks on a promise, then tasks
from the pool [p] are executed until the promise blocking [t] is resolved.
This function should be used at the top level to enclose the calls to other
functions that may await on promises. This includes {!await},
Expand All @@ -41,23 +41,26 @@ val async : pool -> 'a task -> 'a promise
*)

val await : pool -> 'a promise -> 'a
(** [await p r] waits for the promise to be resolved. If the task associated
with the promise had completed sucessfully, then the result of the task
will be returned. If the task had raised an exception, then [await] raises
the same exception.
(** [await p r] waits for the promise [r] to be resolved. During the resolution,
other tasks in the pool [p] might be run using the calling domain and/or the
domains in the pool [p]. If the task associated with the promise have
completed successfully, then the result of the task will be returned. If the
task have raised an exception, then [await] raises the same exception.
Must be called with a call to {!run} in the dynamic scope. *)
Must be called with a call to {!run} in the dynamic scope to handle the
internal algebraic effects for task synchronization. *)

val parallel_for : ?chunk_size:int -> start:int -> finish:int ->
body:(int -> unit) -> pool -> unit
(** [parallel_for c s f b p] behaves similar to [for i=s to f do b i done], but
runs the for loop in parallel. The chunk size [c] determines the number of
body applications done in one task; this will default to [max(1,
(finish-start + 1) / (8 * num_domains))]. Individual iterations may be run in
any order. Tasks are distributed to workers using a divide-and-conquer
scheme.
runs the for loop in parallel with the calling domain and/or the domains in
the pool [p]. The chunk size [c] determines the number of body applications
done in one task; this will default to [max(1, (finish-start + 1) / (8 *
num_domains))]. Individual iterations may be run in any order. Tasks are
distributed to the participating domains using a divide-and-conquer scheme.
Must be called with a call to {!run} in the dynamic scope. *)
Must be called with a call to {!run} in the dynamic scope to handle the
internal algebraic effects for task synchronization. *)

val parallel_for_reduce : ?chunk_size:int -> start:int -> finish:int ->
body:(int -> 'a) -> pool -> ('a -> 'a -> 'a) -> 'a -> 'a
Expand All @@ -67,14 +70,16 @@ val parallel_for_reduce : ?chunk_size:int -> start:int -> finish:int ->
the reduce function needs to be commutative and associative in order to
obtain a deterministic result.
Must be called with a call to {!run} in the dynamic scope. *)
Must be called with a call to {!run} in the dynamic scope to handle the
internal algebraic effects for task synchronization. *)

val parallel_scan : pool -> ('a -> 'a -> 'a) -> 'a array -> 'a array
(** [parallel_scan p op a] computes the scan of the array [a] in parallel with
binary operator [op] and returns the result array. Scan is similar to
[Array.fold_left] but returns an array of reduced intermediate values. The
reduce operations are performed in an arbitrary order and the reduce
function needs to be commutative and associative in order to obtain a
deterministic result.
Must be called with a call to {!run} in the dynamic scope. *)
binary operator [op] and returns the result array, using the calling domain
and/or the domains in the pool [p]. Scan is similar to [Array.fold_left]
but returns an array of reduced intermediate values. The reduce operations
are performed in an arbitrary order and the reduce function needs to be
commutative and associative in order to obtain a deterministic result.
Must be called with a call to {!run} in the dynamic scope to handle the
internal algebraic effects for task synchronization. *)
2 changes: 1 addition & 1 deletion test/LU_decomposition_multicore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ let lup pool (a0 : float array) =
a

let () =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let pool = T.setup_pool ~num_domains:(num_domains - 1) () in
T.run pool (fun _ ->
let a = parallel_create pool
(fun _ _ -> (Random.State.float (Domain.DLS.get k) 100.0) +. 1.0 ) in
Expand Down
2 changes: 1 addition & 1 deletion test/backtrace.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ let rec bar i =
[@@inline never]

let main () =
let pool = T.setup_pool ~num_additional_domains:0 () in
let pool = T.setup_pool ~num_domains:0 () in
T.run pool (fun _ ->
let p = T.async pool (fun _ -> bar 42) in
T.await pool p;
Expand Down
2 changes: 1 addition & 1 deletion test/enumerate_par.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ let n = try int_of_string Sys.argv.(2) with _ -> 100
module T = Domainslib.Task

let _ =
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let p = T.setup_pool ~num_domains:(num_domains - 1) () in
T.run p (fun _ ->
T.parallel_for p ~start:0 ~finish:(n-1) ~chunk_size:16 ~body:(fun i ->
print_string @@ Printf.sprintf "[%d] %d\n%!" (Domain.self () :> int) i));
Expand Down
2 changes: 1 addition & 1 deletion test/fib_par.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ let rec fib_par pool n =
T.await pool a + T.await pool b

let main =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let pool = T.setup_pool ~num_domains:(num_domains - 1) () in
let res = T.run pool (fun _ -> fib_par pool n) in
T.teardown_pool pool;
Printf.printf "fib(%d) = %d\n" n res
2 changes: 1 addition & 1 deletion test/game_of_life_multicore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ let rec repeat pool n =
| _-> next pool; repeat pool (n-1)

let ()=
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let pool = T.setup_pool ~num_domains:(num_domains - 1) () in
print !rg;
T.run pool (fun _ -> repeat pool n_times);
print !rg;
Expand Down
2 changes: 1 addition & 1 deletion test/off_by_one.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ let print_array a =
let r = Array.init 20 (fun i -> i + 1)

let scan_task num_doms =
let pool = Task.setup_pool ~num_additional_domains:num_doms () in
let pool = Task.setup_pool ~num_domains:num_doms () in
let a = Task.run pool (fun () -> Task.parallel_scan pool (+) (Array.make 20 1)) in
Task.teardown_pool pool;
Printf.printf "%i: %s\n%!" num_doms (print_array a);
Expand Down
2 changes: 1 addition & 1 deletion test/prefix_sum.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ let gen n = Array.make n 1 (*(fun _ -> Random.int n)*)
let prefix_sum pool = T.parallel_scan pool (+)

let _ =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let pool = T.setup_pool ~num_domains:(num_domains - 1) () in
let arr = gen n in
let t = Unix.gettimeofday() in
ignore (T.run pool (fun _ -> prefix_sum pool arr));
Expand Down
2 changes: 1 addition & 1 deletion test/spectralnorm2_multicore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ let eval_AtA_times_u pool u v =
eval_A_times_u pool u w; eval_At_times_u pool w v

let () =
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let pool = T.setup_pool ~num_domains:(num_domains - 1) () in
let u = Array.make n 1.0 and v = Array.make n 0.0 in
T.run pool (fun _ ->
for _i = 0 to 9 do
Expand Down
6 changes: 3 additions & 3 deletions test/sum_par.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module T = Domainslib.Task

let _ =
(* use parallel_for_reduce *)
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let p = T.setup_pool ~num_domains:(num_domains - 1) () in
let sum = T.run p (fun _ ->
T.parallel_for_reduce p (+) 0 ~chunk_size:(n/(4*num_domains)) ~start:0
~finish:(n-1) ~body:(fun _i -> 1))
Expand All @@ -16,7 +16,7 @@ let _ =

let _ =
(* explictly use empty pool and default chunk_size *)
let p = T.setup_pool ~num_additional_domains:0 () in
let p = T.setup_pool ~num_domains:0 () in
let sum = Atomic.make 0 in
T.run p (fun _ ->
T.parallel_for p ~start:0 ~finish:(n-1)
Expand All @@ -28,7 +28,7 @@ let _ =

let _ =
(* configured num_domains and default chunk_size *)
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let p = T.setup_pool ~num_domains:(num_domains - 1) () in
let sum = Atomic.make 0 in
T.run p (fun _ ->
T.parallel_for p ~start:0 ~finish:(n-1)
Expand Down
2 changes: 1 addition & 1 deletion test/summed_area_table.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ let calc_table pool mat =
let _ =
let m = Array.make_matrix size size 1 (*Array.init size (fun _ -> Array.init size (fun _ -> Random.int size))*)
in
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
let pool = T.setup_pool ~num_domains:(num_domains - 1) () in
let _ = T.run pool (fun _ -> calc_table pool m) in

(* for i = 0 to size-1 do
Expand Down
2 changes: 1 addition & 1 deletion test/task_throughput.ml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ end
let _ =
Printf.printf "n_iterations: %d n_units: %d n_domains: %d\n"
n_iterations n_tasks n_domains;
let pool = T.setup_pool ~num_additional_domains:(n_domains - 1) () in
let pool = T.setup_pool ~num_domains:(n_domains - 1) () in

let hist = TimingHist.make 5 25 in
for _ = 1 to n_iterations do
Expand Down
2 changes: 1 addition & 1 deletion test/test_deadlock.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ let rec loop n =
else (Domain.cpu_relax (); loop (n-1))

let () =
let pool = T.setup_pool ~num_additional_domains:2 () in
let pool = T.setup_pool ~num_domains:2 () in
T.run pool (fun _ ->
let a = T.async pool (fun _ ->
Printf.printf "Task A running on domain %d\n%!" (Domain.self () :> int);
Expand Down
6 changes: 3 additions & 3 deletions test/test_task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ let prefix_sum pool = fun () ->


let () =
let pool1 = Task.setup_pool ~num_additional_domains:2 ~name:"pool1" () in
let pool2 = Task.setup_pool ~num_additional_domains:2 ~name:"pool2" () in
let pool1 = Task.setup_pool ~num_domains:2 ~name:"pool1" () in
let pool2 = Task.setup_pool ~num_domains:2 ~name:"pool2" () in
Task.run pool1 (fun _ ->
let p1 = Option.get @@ Task.lookup_pool "pool1" in
modify_arr pool1 0 ();
Expand Down Expand Up @@ -68,6 +68,6 @@ let () =
assert (Task.lookup_pool "pool1" = None);

try
let _ = Task.setup_pool ~num_additional_domains:(-1) () in ()
let _ = Task.setup_pool ~num_domains:(-1) () in ()
with Invalid_argument _ -> ();
print_endline "ok"
4 changes: 2 additions & 2 deletions test/test_task_crash.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ let work () =
done
;;
begin
let pool1 = Task.setup_pool ~num_additional_domains:2 () in
let pool2 = Task.setup_pool ~num_additional_domains:1 () in
let pool1 = Task.setup_pool ~num_domains:2 () in
let pool2 = Task.setup_pool ~num_domains:1 () in

let pool1_prom0 = Task.async pool1 work in

Expand Down
2 changes: 1 addition & 1 deletion test/test_task_empty.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ open Domainslib

let array_size = 0

let pool = Task.setup_pool ~num_additional_domains:0 ()
let pool = Task.setup_pool ~num_domains:0 ()
let res = Task.run pool (fun () ->
Task.parallel_for_reduce ~chunk_size:0 ~start:0 ~finish:(array_size-1) ~body:(fun _ -> 1) pool (+) 0);;
Task.teardown_pool pool;;
Expand Down

0 comments on commit 94abdda

Please sign in to comment.