Skip to content

Commit

Permalink
scale-tests
Browse files Browse the repository at this point in the history
TODO: measure threadpool overhead too and see where we stop scaling linearly with threads,
because PAM alone is a lot faster than what we get from the threadpool or the threaded test.
  • Loading branch information
edwintorok committed Oct 3, 2023
1 parent e714a19 commit 050adfa
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 4 deletions.
2 changes: 1 addition & 1 deletion ocaml/auth/pam.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ external authorize : pam_handle -> string -> string -> unit = "stub_XA_mh_author

(* TODO: make this configurable in Xapi_globs *)
(* because this is initialized on startup this is not settable from a config file yet! *)
let auth_workers = Threadpool.create ~name:"PAM auth" authenticate_start authenticate_stop 8
let auth_workers : (pam_handle, unit) Threadpool.t = Threadpool.create ~name:"PAM auth" authenticate_start authenticate_stop 8
(*
let () = at_exit (fun () -> Threadpool.shutdown auth_workers)
*)
Expand Down
52 changes: 50 additions & 2 deletions ocaml/auth/threadpool.ml
Original file line number Diff line number Diff line change
@@ -1,9 +1,56 @@
type 'a wrapped = ('a, Rresult.R.exn_trap) result
type (-'a, 'b) task = { f: 'a -> 'b; result: 'b wrapped Event.channel }


type ('a, 'b) t =
{ stop_all: unit -> unit
; tasks: ('a, 'b) task option Event.channel
}

let create ~name:_ acquire release workers =
let tasks = Event.new_channel () in

let rec worker_loop resource () =
match Event.sync (Event.receive tasks) with
| None -> () (* stop *)
| Some task ->
let result = Rresult.R.trap_exn task.f resource in
Event.sync (Event.send task.result result);
worker_loop resource ()
in

let worker _ =
let resource = acquire () in
let finally () = release resource in
Fun.protect ~finally @@ worker_loop resource
in
let threads = Array.init workers @@ fun idx -> Thread.create worker idx in

let stop_all () =
(* queue up an abort job for each worker for a graceful shutdown *)
let stop_all = List.init workers (fun _ -> Event.send tasks None) in
Event.select stop_all;
Array.iter Thread.join threads
in
{ stop_all; tasks}

let run_in_pool' pool f =
let result = Event.new_channel () in
Event.sync (Event.send pool.tasks (Some {f; result }));
let wait () =
match Event.sync(Event.receive result) with
| Ok r -> r
| Error (`Exn_trap (e, bt)) ->
Printexc.raise_with_backtrace e bt
in wait

let run_in_pool pool f =
run_in_pool' pool f ()

let shutdown t = t.stop_all ()

let mutex_execute = Xapi_stdext_threads.Threadext.Mutex.execute
(*let mutex_execute = Xapi_stdext_threads.Threadext.Mutex.execute
module D = Debug.Make (struct let name = "threadpool" end)
type 'a queue = {
tasks: ('a -> unit) Queue.t
Expand Down Expand Up @@ -99,3 +146,4 @@ let shutdown (pool, workers) =
D.debug "Waiting for %s worker pool to exit" pool.name ;
Array.iter Thread.join workers ;
D.debug "Worker pool %s has exited" pool.name
*)
18 changes: 17 additions & 1 deletion ocaml/tests/bench/bench_auth.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
open Bechamel

let api_pool = Threadpool.create ~name:"api" ignore ignore 16
(*let api_pool = Threadpool.create ~name:"api" ignore ignore 16*)

let concurrently n m f =
let sem = Semaphore.Counting.make m in
Expand Down Expand Up @@ -34,12 +34,28 @@ let test name execute =

let auth_pam pam = Pam.authorize pam "pamtest-edvint" "pamtest-edvint"


let test2 name execute =
Test.make_indexed_with_resource ~name ~args:[1;8]
~allocate:(fun n -> Array.init n @@ fun _ -> Pam.authenticate_start ())
~free:(fun a -> Array.iter Pam.authenticate_stop a)
Bechamel.Test.uniq
@@ fun n ->
Staged.stage @@ fun handles ->
let threads = Array.init n @@ Thread.create (fun i ->
for _ = 1 to 10 do
execute handles.(i)
done
) in
Array.iter Thread.join threads

let auth_user pool _ =
Threadpool.run_in_pool pool auth_pam

let benchmarks =
Test.make_grouped ~name:"Auth"
([test "auth" auth_user
;test2 "auth2" auth_pam
]
)

Expand Down
8 changes: 8 additions & 0 deletions ocaml/tests/bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,11 @@
(optional)
(libraries bechamel_simple_cli pam tests_common)
)

(executable
(name scale_auth)
(modules scale_auth)
(modes exe)
(optional)
(libraries pam mtime mtime.clock.os)
)
49 changes: 49 additions & 0 deletions ocaml/tests/bench/scale_auth.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
let () =
let nthreads = ref 8 in
let user = ref "" in
let password = ref "" in
let perthread = ref 4 in
(* TODO: use bechamel to measure speed! *)
Arg.parse [
"--threads", Arg.Set_int nthreads, "Number of threads to use"
; "--username", Arg.Set_string user, "Username for test"
; "--password", Arg.Set_string password, "Password for test"
; "--perthread", Arg.Set_int perthread, "Authentication attempts per thread"
] (fun _ -> raise (Arg.Bad "Unexpected argument")) "Usage";

let start_semaphore = Semaphore.Counting.make !nthreads in
let finish_semaphore = Semaphore.Counting.make 0 in

let threads = Array.init !nthreads @@ Thread.create @@ fun _ ->
let handle = Pam.authenticate_start () in
let finally () = Pam.authenticate_stop handle in
Fun.protect ~finally @@ fun () ->

Semaphore.Counting.acquire start_semaphore;

for _ = 1 to !perthread do
Pam.authorize handle !user !password;
done;

Semaphore.Counting.release finish_semaphore
in

let start () =
for _ = 1 to !nthreads do
Semaphore.Counting.release start_semaphore
done
in

let wait () =
for _ = 1 to !nthreads do
Semaphore.Counting.acquire finish_semaphore
done
in

let dt = Mtime_clock.counter () in
start ();
wait ();
let speed = float (!nthreads * !perthread) /. (Mtime_clock.count dt |> Mtime.Span.to_s) in
Printf.printf "Auth/s: %.3f\n" speed;

Array.iter Thread.join threads

0 comments on commit 050adfa

Please sign in to comment.