From 050adfaee2af95c51b041cf164c2f94f53e376a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Tue, 3 Oct 2023 17:53:55 +0100 Subject: [PATCH] scale-tests TODO: measure threadpool overhead too and see where we stop scaling linearly with threads, because PAM alone is a lot faster than what we get from the threadpool or the threaded test. --- ocaml/auth/pam.ml | 2 +- ocaml/auth/threadpool.ml | 52 +++++++++++++++++++++++++++++++-- ocaml/tests/bench/bench_auth.ml | 18 +++++++++++- ocaml/tests/bench/dune | 8 +++++ ocaml/tests/bench/scale_auth.ml | 49 +++++++++++++++++++++++++++++++ 5 files changed, 125 insertions(+), 4 deletions(-) create mode 100644 ocaml/tests/bench/scale_auth.ml diff --git a/ocaml/auth/pam.ml b/ocaml/auth/pam.ml index c246c875217..a27cdd996ea 100644 --- a/ocaml/auth/pam.ml +++ b/ocaml/auth/pam.ml @@ -21,7 +21,7 @@ external authorize : pam_handle -> string -> string -> unit = "stub_XA_mh_author (* TODO: make this configurable in Xapi_globs *) (* because this is initialized on startup this is not settable from a config file yet! *) -let auth_workers = Threadpool.create ~name:"PAM auth" authenticate_start authenticate_stop 8 +let auth_workers : (pam_handle, unit) Threadpool.t = Threadpool.create ~name:"PAM auth" authenticate_start authenticate_stop 8 (* let () = at_exit (fun () -> Threadpool.shutdown auth_workers) *) diff --git a/ocaml/auth/threadpool.ml b/ocaml/auth/threadpool.ml index 427fbc98af9..16964800b7f 100644 --- a/ocaml/auth/threadpool.ml +++ b/ocaml/auth/threadpool.ml @@ -1,9 +1,56 @@ +type 'a wrapped = ('a, Rresult.R.exn_trap) result +type (-'a, 'b) task = { f: 'a -> 'b; result: 'b wrapped Event.channel } +type ('a, 'b) t = +{ stop_all: unit -> unit +; tasks: ('a, 'b) task option Event.channel +} + +let create ~name:_ acquire release workers = + let tasks = Event.new_channel () in + + let rec worker_loop resource () = + match Event.sync (Event.receive tasks) with + | None -> () (* stop *) + | Some task -> + let result = Rresult.R.trap_exn task.f resource in + Event.sync (Event.send task.result result); + worker_loop resource () + in + + let worker _ = + let resource = acquire () in + let finally () = release resource in + Fun.protect ~finally @@ worker_loop resource + in + let threads = Array.init workers @@ fun idx -> Thread.create worker idx in + + let stop_all () = + (* queue up an abort job for each worker for a graceful shutdown *) + let stop_all = List.init workers (fun _ -> Event.send tasks None) in + Event.select stop_all; + Array.iter Thread.join threads + in + { stop_all; tasks} + +let run_in_pool' pool f = + let result = Event.new_channel () in + Event.sync (Event.send pool.tasks (Some {f; result })); + let wait () = + match Event.sync(Event.receive result) with + | Ok r -> r + | Error (`Exn_trap (e, bt)) -> + Printexc.raise_with_backtrace e bt + in wait + +let run_in_pool pool f = + run_in_pool' pool f () + +let shutdown t = t.stop_all () -let mutex_execute = Xapi_stdext_threads.Threadext.Mutex.execute +(*let mutex_execute = Xapi_stdext_threads.Threadext.Mutex.execute -module D = Debug.Make (struct let name = "threadpool" end) type 'a queue = { tasks: ('a -> unit) Queue.t @@ -99,3 +146,4 @@ let shutdown (pool, workers) = D.debug "Waiting for %s worker pool to exit" pool.name ; Array.iter Thread.join workers ; D.debug "Worker pool %s has exited" pool.name +*) \ No newline at end of file diff --git a/ocaml/tests/bench/bench_auth.ml b/ocaml/tests/bench/bench_auth.ml index 6fab61ee8fe..8d49a64b6a5 100644 --- a/ocaml/tests/bench/bench_auth.ml +++ b/ocaml/tests/bench/bench_auth.ml @@ -1,6 +1,6 @@ open Bechamel -let api_pool = Threadpool.create ~name:"api" ignore ignore 16 +(*let api_pool = Threadpool.create ~name:"api" ignore ignore 16*) let concurrently n m f = let sem = Semaphore.Counting.make m in @@ -34,12 +34,28 @@ let test name execute = let auth_pam pam = Pam.authorize pam "pamtest-edvint" "pamtest-edvint" + +let test2 name execute = + Test.make_indexed_with_resource ~name ~args:[1;8] + ~allocate:(fun n -> Array.init n @@ fun _ -> Pam.authenticate_start ()) + ~free:(fun a -> Array.iter Pam.authenticate_stop a) + Bechamel.Test.uniq + @@ fun n -> + Staged.stage @@ fun handles -> + let threads = Array.init n @@ Thread.create (fun i -> + for _ = 1 to 10 do + execute handles.(i) + done + ) in + Array.iter Thread.join threads + let auth_user pool _ = Threadpool.run_in_pool pool auth_pam let benchmarks = Test.make_grouped ~name:"Auth" ([test "auth" auth_user + ;test2 "auth2" auth_pam ] ) diff --git a/ocaml/tests/bench/dune b/ocaml/tests/bench/dune index 678141eb547..4068edee85c 100644 --- a/ocaml/tests/bench/dune +++ b/ocaml/tests/bench/dune @@ -19,3 +19,11 @@ (optional) (libraries bechamel_simple_cli pam tests_common) ) + +(executable + (name scale_auth) + (modules scale_auth) + (modes exe) + (optional) + (libraries pam mtime mtime.clock.os) +) diff --git a/ocaml/tests/bench/scale_auth.ml b/ocaml/tests/bench/scale_auth.ml new file mode 100644 index 00000000000..5640a62bb3d --- /dev/null +++ b/ocaml/tests/bench/scale_auth.ml @@ -0,0 +1,49 @@ +let () = + let nthreads = ref 8 in + let user = ref "" in + let password = ref "" in + let perthread = ref 4 in + (* TODO: use bechamel to measure speed! *) + Arg.parse [ + "--threads", Arg.Set_int nthreads, "Number of threads to use" + ; "--username", Arg.Set_string user, "Username for test" + ; "--password", Arg.Set_string password, "Password for test" + ; "--perthread", Arg.Set_int perthread, "Authentication attempts per thread" + ] (fun _ -> raise (Arg.Bad "Unexpected argument")) "Usage"; + + let start_semaphore = Semaphore.Counting.make !nthreads in + let finish_semaphore = Semaphore.Counting.make 0 in + + let threads = Array.init !nthreads @@ Thread.create @@ fun _ -> + let handle = Pam.authenticate_start () in + let finally () = Pam.authenticate_stop handle in + Fun.protect ~finally @@ fun () -> + + Semaphore.Counting.acquire start_semaphore; + + for _ = 1 to !perthread do + Pam.authorize handle !user !password; + done; + + Semaphore.Counting.release finish_semaphore + in + + let start () = + for _ = 1 to !nthreads do + Semaphore.Counting.release start_semaphore + done + in + + let wait () = + for _ = 1 to !nthreads do + Semaphore.Counting.acquire finish_semaphore + done + in + + let dt = Mtime_clock.counter () in + start (); + wait (); + let speed = float (!nthreads * !perthread) /. (Mtime_clock.count dt |> Mtime.Span.to_s) in + Printf.printf "Auth/s: %.3f\n" speed; + + Array.iter Thread.join threads \ No newline at end of file