diff --git a/lib/task.ml b/lib/task.ml index 4653624..b2a49f3 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -106,13 +106,13 @@ let run (type a) pool (f : unit -> a) : a = let named_pools = Hashtbl.create 8 let named_pools_mutex = Mutex.create () -let setup_pool ?name ~num_additional_domains () = - if num_additional_domains < 0 then +let setup_pool ?name ~num_domains () = + if num_domains < 0 then raise (Invalid_argument - "Task.setup_pool: num_additional_domains must be at least 0") + "Task.setup_pool: num_domains must be at least 0") else - let task_chan = Multi_channel.make (num_additional_domains+1) in - let domains = Array.init num_additional_domains (fun _ -> + let task_chan = Multi_channel.make (num_domains+1) in + let domains = Array.init num_domains (fun _ -> Domain.spawn (fun _ -> worker task_chan)) in let p = Atomic.make (Some {domains; task_chan; name}) in diff --git a/lib/task.mli b/lib/task.mli index 614d1a9..fc7c0fa 100644 --- a/lib/task.mli +++ b/lib/task.mli @@ -7,12 +7,12 @@ type !'a promise type pool (** Type of task pool *) -val setup_pool : ?name:string -> num_additional_domains:int -> unit -> pool -(** Sets up a task execution pool with [num_additional_domains + 1] domains - including the current domain. If [name] is provided, the pool is mapped to - [name] which can be looked up later with [lookup_pool name]. +val setup_pool : ?name:string -> num_domains:int -> unit -> pool +(** Sets up a task execution pool with [num_domains] new domains. If [name] is + provided, the pool is mapped to [name] which can be looked up later with + [lookup_pool name]. - Raises {!Invalid_argument} when [num_additional_domains] is less than 0. *) + Raises {!Invalid_argument} when [num_domains] is less than 0. *) val teardown_pool : pool -> unit (** Tears down the task execution pool. *) @@ -26,9 +26,9 @@ val get_num_domains : pool -> int including the parent domain. *) val run : pool -> 'a task -> 'a -(** [run p t] runs the task [t] synchronously in the pool [p]. If the task [t] - blocks on a promise, then tasks from the pool [p] are executed until the - promise blocking [t] is resolved. +(** [run p t] runs the task [t] synchronously with the calling domain and the + domains in the pool [p]. If the task [t] blocks on a promise, then tasks + from the pool [p] are executed until the promise blocking [t] is resolved. This function should be used at the top level to enclose the calls to other functions that may await on promises. This includes {!await}, @@ -41,23 +41,26 @@ val async : pool -> 'a task -> 'a promise *) val await : pool -> 'a promise -> 'a -(** [await p r] waits for the promise to be resolved. If the task associated - with the promise had completed sucessfully, then the result of the task - will be returned. If the task had raised an exception, then [await] raises - the same exception. +(** [await p r] waits for the promise [r] to be resolved. During the resolution, + other tasks in the pool [p] might be run using the calling domain and/or the + domains in the pool [p]. If the task associated with the promise have + completed successfully, then the result of the task will be returned. If the + task have raised an exception, then [await] raises the same exception. - Must be called with a call to {!run} in the dynamic scope. *) + Must be called with a call to {!run} in the dynamic scope to handle the + internal algebraic effects for task synchronization. *) val parallel_for : ?chunk_size:int -> start:int -> finish:int -> body:(int -> unit) -> pool -> unit (** [parallel_for c s f b p] behaves similar to [for i=s to f do b i done], but - runs the for loop in parallel. The chunk size [c] determines the number of - body applications done in one task; this will default to [max(1, - (finish-start + 1) / (8 * num_domains))]. Individual iterations may be run in - any order. Tasks are distributed to workers using a divide-and-conquer - scheme. + runs the for loop in parallel with the calling domain and/or the domains in + the pool [p]. The chunk size [c] determines the number of body applications + done in one task; this will default to [max(1, (finish-start + 1) / (8 * + num_domains))]. Individual iterations may be run in any order. Tasks are + distributed to the participating domains using a divide-and-conquer scheme. - Must be called with a call to {!run} in the dynamic scope. *) + Must be called with a call to {!run} in the dynamic scope to handle the + internal algebraic effects for task synchronization. *) val parallel_for_reduce : ?chunk_size:int -> start:int -> finish:int -> body:(int -> 'a) -> pool -> ('a -> 'a -> 'a) -> 'a -> 'a @@ -67,14 +70,16 @@ val parallel_for_reduce : ?chunk_size:int -> start:int -> finish:int -> the reduce function needs to be commutative and associative in order to obtain a deterministic result. - Must be called with a call to {!run} in the dynamic scope. *) + Must be called with a call to {!run} in the dynamic scope to handle the + internal algebraic effects for task synchronization. *) val parallel_scan : pool -> ('a -> 'a -> 'a) -> 'a array -> 'a array (** [parallel_scan p op a] computes the scan of the array [a] in parallel with - binary operator [op] and returns the result array. Scan is similar to - [Array.fold_left] but returns an array of reduced intermediate values. The - reduce operations are performed in an arbitrary order and the reduce - function needs to be commutative and associative in order to obtain a - deterministic result. - - Must be called with a call to {!run} in the dynamic scope. *) + binary operator [op] and returns the result array, using the calling domain + and/or the domains in the pool [p]. Scan is similar to [Array.fold_left] + but returns an array of reduced intermediate values. The reduce operations + are performed in an arbitrary order and the reduce function needs to be + commutative and associative in order to obtain a deterministic result. + + Must be called with a call to {!run} in the dynamic scope to handle the + internal algebraic effects for task synchronization. *) diff --git a/test/LU_decomposition_multicore.ml b/test/LU_decomposition_multicore.ml index aa207f1..ab123ee 100644 --- a/test/LU_decomposition_multicore.ml +++ b/test/LU_decomposition_multicore.ml @@ -54,7 +54,7 @@ let lup pool (a0 : float array) = a let () = - let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in + let pool = T.setup_pool ~num_domains:(num_domains - 1) () in T.run pool (fun _ -> let a = parallel_create pool (fun _ _ -> (Random.State.float (Domain.DLS.get k) 100.0) +. 1.0 ) in diff --git a/test/backtrace.ml b/test/backtrace.ml index f106e7f..7fb7e56 100644 --- a/test/backtrace.ml +++ b/test/backtrace.ml @@ -17,7 +17,7 @@ let rec bar i = [@@inline never] let main () = - let pool = T.setup_pool ~num_additional_domains:0 () in + let pool = T.setup_pool ~num_domains:0 () in T.run pool (fun _ -> let p = T.async pool (fun _ -> bar 42) in T.await pool p; diff --git a/test/enumerate_par.ml b/test/enumerate_par.ml index ea40092..da8e65f 100644 --- a/test/enumerate_par.ml +++ b/test/enumerate_par.ml @@ -4,7 +4,7 @@ let n = try int_of_string Sys.argv.(2) with _ -> 100 module T = Domainslib.Task let _ = - let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in + let p = T.setup_pool ~num_domains:(num_domains - 1) () in T.run p (fun _ -> T.parallel_for p ~start:0 ~finish:(n-1) ~chunk_size:16 ~body:(fun i -> print_string @@ Printf.sprintf "[%d] %d\n%!" (Domain.self () :> int) i)); diff --git a/test/fib_par.ml b/test/fib_par.ml index eb65df1..77e944b 100644 --- a/test/fib_par.ml +++ b/test/fib_par.ml @@ -15,7 +15,7 @@ let rec fib_par pool n = T.await pool a + T.await pool b let main = - let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in + let pool = T.setup_pool ~num_domains:(num_domains - 1) () in let res = T.run pool (fun _ -> fib_par pool n) in T.teardown_pool pool; Printf.printf "fib(%d) = %d\n" n res diff --git a/test/game_of_life_multicore.ml b/test/game_of_life_multicore.ml index f26e422..6606ab7 100644 --- a/test/game_of_life_multicore.ml +++ b/test/game_of_life_multicore.ml @@ -62,7 +62,7 @@ let rec repeat pool n = | _-> next pool; repeat pool (n-1) let ()= - let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in + let pool = T.setup_pool ~num_domains:(num_domains - 1) () in print !rg; T.run pool (fun _ -> repeat pool n_times); print !rg; diff --git a/test/off_by_one.ml b/test/off_by_one.ml index e513985..4ca5c1f 100644 --- a/test/off_by_one.ml +++ b/test/off_by_one.ml @@ -10,7 +10,7 @@ let print_array a = let r = Array.init 20 (fun i -> i + 1) let scan_task num_doms = - let pool = Task.setup_pool ~num_additional_domains:num_doms () in + let pool = Task.setup_pool ~num_domains:num_doms () in let a = Task.run pool (fun () -> Task.parallel_scan pool (+) (Array.make 20 1)) in Task.teardown_pool pool; Printf.printf "%i: %s\n%!" num_doms (print_array a); diff --git a/test/prefix_sum.ml b/test/prefix_sum.ml index 8926395..0f57cb7 100644 --- a/test/prefix_sum.ml +++ b/test/prefix_sum.ml @@ -7,7 +7,7 @@ let gen n = Array.make n 1 (*(fun _ -> Random.int n)*) let prefix_sum pool = T.parallel_scan pool (+) let _ = - let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in + let pool = T.setup_pool ~num_domains:(num_domains - 1) () in let arr = gen n in let t = Unix.gettimeofday() in ignore (T.run pool (fun _ -> prefix_sum pool arr)); diff --git a/test/spectralnorm2_multicore.ml b/test/spectralnorm2_multicore.ml index c2a3b5f..f8c5830 100644 --- a/test/spectralnorm2_multicore.ml +++ b/test/spectralnorm2_multicore.ml @@ -34,7 +34,7 @@ let eval_AtA_times_u pool u v = eval_A_times_u pool u w; eval_At_times_u pool w v let () = - let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in + let pool = T.setup_pool ~num_domains:(num_domains - 1) () in let u = Array.make n 1.0 and v = Array.make n 0.0 in T.run pool (fun _ -> for _i = 0 to 9 do diff --git a/test/sum_par.ml b/test/sum_par.ml index 0a99a67..3e0083a 100644 --- a/test/sum_par.ml +++ b/test/sum_par.ml @@ -5,7 +5,7 @@ module T = Domainslib.Task let _ = (* use parallel_for_reduce *) - let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in + let p = T.setup_pool ~num_domains:(num_domains - 1) () in let sum = T.run p (fun _ -> T.parallel_for_reduce p (+) 0 ~chunk_size:(n/(4*num_domains)) ~start:0 ~finish:(n-1) ~body:(fun _i -> 1)) @@ -16,7 +16,7 @@ let _ = let _ = (* explictly use empty pool and default chunk_size *) - let p = T.setup_pool ~num_additional_domains:0 () in + let p = T.setup_pool ~num_domains:0 () in let sum = Atomic.make 0 in T.run p (fun _ -> T.parallel_for p ~start:0 ~finish:(n-1) @@ -28,7 +28,7 @@ let _ = let _ = (* configured num_domains and default chunk_size *) - let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in + let p = T.setup_pool ~num_domains:(num_domains - 1) () in let sum = Atomic.make 0 in T.run p (fun _ -> T.parallel_for p ~start:0 ~finish:(n-1) diff --git a/test/summed_area_table.ml b/test/summed_area_table.ml index b48f076..b860995 100644 --- a/test/summed_area_table.ml +++ b/test/summed_area_table.ml @@ -29,7 +29,7 @@ let calc_table pool mat = let _ = let m = Array.make_matrix size size 1 (*Array.init size (fun _ -> Array.init size (fun _ -> Random.int size))*) in - let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in + let pool = T.setup_pool ~num_domains:(num_domains - 1) () in let _ = T.run pool (fun _ -> calc_table pool m) in (* for i = 0 to size-1 do diff --git a/test/task_throughput.ml b/test/task_throughput.ml index 059dec9..c8bce4c 100644 --- a/test/task_throughput.ml +++ b/test/task_throughput.ml @@ -50,7 +50,7 @@ end let _ = Printf.printf "n_iterations: %d n_units: %d n_domains: %d\n" n_iterations n_tasks n_domains; - let pool = T.setup_pool ~num_additional_domains:(n_domains - 1) () in + let pool = T.setup_pool ~num_domains:(n_domains - 1) () in let hist = TimingHist.make 5 25 in for _ = 1 to n_iterations do diff --git a/test/test_deadlock.ml b/test/test_deadlock.ml index 42c41fc..3e78cee 100644 --- a/test/test_deadlock.ml +++ b/test/test_deadlock.ml @@ -12,7 +12,7 @@ let rec loop n = else (Domain.cpu_relax (); loop (n-1)) let () = - let pool = T.setup_pool ~num_additional_domains:2 () in + let pool = T.setup_pool ~num_domains:2 () in T.run pool (fun _ -> let a = T.async pool (fun _ -> Printf.printf "Task A running on domain %d\n%!" (Domain.self () :> int); diff --git a/test/test_task.ml b/test/test_task.ml index 951c253..8e30b3d 100644 --- a/test/test_task.ml +++ b/test/test_task.ml @@ -37,8 +37,8 @@ let prefix_sum pool = fun () -> let () = - let pool1 = Task.setup_pool ~num_additional_domains:2 ~name:"pool1" () in - let pool2 = Task.setup_pool ~num_additional_domains:2 ~name:"pool2" () in + let pool1 = Task.setup_pool ~num_domains:2 ~name:"pool1" () in + let pool2 = Task.setup_pool ~num_domains:2 ~name:"pool2" () in Task.run pool1 (fun _ -> let p1 = Option.get @@ Task.lookup_pool "pool1" in modify_arr pool1 0 (); @@ -68,6 +68,6 @@ let () = assert (Task.lookup_pool "pool1" = None); try - let _ = Task.setup_pool ~num_additional_domains:(-1) () in () + let _ = Task.setup_pool ~num_domains:(-1) () in () with Invalid_argument _ -> (); print_endline "ok" diff --git a/test/test_task_crash.ml b/test/test_task_crash.ml index 5327349..ce0a8af 100644 --- a/test/test_task_crash.ml +++ b/test/test_task_crash.ml @@ -11,8 +11,8 @@ let work () = done ;; begin - let pool1 = Task.setup_pool ~num_additional_domains:2 () in - let pool2 = Task.setup_pool ~num_additional_domains:1 () in + let pool1 = Task.setup_pool ~num_domains:2 () in + let pool2 = Task.setup_pool ~num_domains:1 () in let pool1_prom0 = Task.async pool1 work in diff --git a/test/test_task_empty.ml b/test/test_task_empty.ml index b840329..ab1dc32 100644 --- a/test/test_task_empty.ml +++ b/test/test_task_empty.ml @@ -2,7 +2,7 @@ open Domainslib let array_size = 0 -let pool = Task.setup_pool ~num_additional_domains:0 () +let pool = Task.setup_pool ~num_domains:0 () let res = Task.run pool (fun () -> Task.parallel_for_reduce ~chunk_size:0 ~start:0 ~finish:(array_size-1) ~body:(fun _ -> 1) pool (+) 0);; Task.teardown_pool pool;;