Skip to content

Commit

Permalink
use 5ms timeslice for each thread
Browse files Browse the repository at this point in the history
  • Loading branch information
edwintorok committed May 7, 2024
1 parent 77ab1d5 commit f4f4570
Show file tree
Hide file tree
Showing 83 changed files with 363 additions and 115 deletions.
2 changes: 1 addition & 1 deletion doc/content/xapi/storage/sxm.md
Original file line number Diff line number Diff line change
Expand Up @@ -1702,7 +1702,7 @@ let post_detach_hook ~sr ~vdi ~dp =
Opt.iter (fun r ->
let remote_url = Http.Url.of_string r.url in
let module Remote = Client(struct let rpc = rpc ~srcstr:"smapiv2" ~dststr:"dst_smapiv2" remote_url end) in
let t = Thread.create (fun () ->
let t = Timers.Timer.thread_create (fun () ->
debug "Calling receive_finalize";
log_and_ignore_exn
(fun () -> Remote.DATA.MIRROR.receive_finalize ~dbg:"Mirror-cleanup" ~id);
Expand Down
4 changes: 4 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
(maintainers "Xapi project maintainers")
(homepage "https://xapi-project.github.io/")

(package
(name xapi-timers)
)

(package
(name zstd)
)
Expand Down
2 changes: 1 addition & 1 deletion ocaml/database/db_cache_impl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ let spawn_db_flush_threads () =
(fun dbconn ->
let db_path = dbconn.Parse_db_conf.path in
ignore
(Thread.create
(Timers.Timer.thread_create
(fun () ->
Debug.with_thread_named
("dbflush [" ^ db_path ^ "]")
Expand Down
2 changes: 1 addition & 1 deletion ocaml/database/master_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ let start_master_connection_watchdog () =
| None ->
my_watchdog :=
Some
(Thread.create
(Timers.Timer.thread_create
(fun () ->
while true do
try
Expand Down
2 changes: 1 addition & 1 deletion ocaml/database/redo_log.ml
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ let shutdown log =
Unix.kill ipid Sys.sigkill ;
(* Wait for the process to die. This is done in a separate thread in case it does not respond to the signal immediately. *)
ignore
(Thread.create
(Timers.Timer.thread_create
(fun () ->
D.debug "Waiting for I/O process with pid %d to die..." ipid ;
with_lock log.dying_processes_mutex (fun () ->
Expand Down
1 change: 1 addition & 0 deletions ocaml/libs/http-lib/dune
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
sha
stunnel
threads.posix
timers
uuid
xapi-backtrace
xapi-consts.xapi_version
Expand Down
6 changes: 3 additions & 3 deletions ocaml/libs/http-lib/server_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ let handler_by_thread tracer (h : handler) (s : Unix.file_descr)
| Error _ ->
None (* too early, don't flood logs *)
in
Thread.create
Timers.Timer.thread_create
(fun () ->
let finally () =
Xapi_stdext_threads.Semaphore.release h.lock 1 ;
Expand Down Expand Up @@ -80,7 +80,7 @@ let establish_server ?(signal_fds = []) forker handler sock =
try ignore (forker tracer handler s caller)
with exc ->
(* NB provided 'forker' is configured to make a background thread then the
only way we can get here is if Thread.create fails.
only way we can get here is if Timers.Timer.thread_create fails.
This means we haven't executed any code which could close the fd therefore
we should do it ourselves. *)
debug "Got exception in server_io.ml: %s" (Printexc.to_string exc) ;
Expand Down Expand Up @@ -117,7 +117,7 @@ let server handler sock =
warn "Attempt to double-shutdown Server_io.server detected; ignoring"
in
let thread =
Thread.create
Timers.Timer.thread_create
(fun () ->
Debug.with_thread_named handler.name
(fun () ->
Expand Down
2 changes: 1 addition & 1 deletion ocaml/libs/http-lib/test_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ let per_nsec n f =
let threads n f =
let results = Array.make n 0 in
let body i () = results.(i) <- f () in
let threads = Array.mapi (fun i _ -> Thread.create (body i) ()) results in
let threads = Array.mapi (fun i _ -> Timers.Timer.thread_create (body i) ()) results in
Array.iter Thread.join threads ;
Array.fold_left ( + ) 0 results

Expand Down
10 changes: 10 additions & 0 deletions ocaml/libs/http-lib/timers/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
(library
(name timers)
(public_name xapi-timers)
(libraries threads.posix)
(foreign_stubs
(language c)
(names nice_stubs timer_stubs)
)
(c_library_flags -lrt)
)
4 changes: 4 additions & 0 deletions ocaml/libs/http-lib/timers/nice.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(** [nice delta] changes the nice value of the current thread on Linux by [delta], and return the new value.
[nice 0] can be used to query the current value without altering it.
*)
external nice: int -> int = "ml_nice"
15 changes: 15 additions & 0 deletions ocaml/libs/http-lib/timers/nice_stubs.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#include <caml/mlvalues.h>
#include <caml/memory.h>
#include <caml/unixsupport.h>
#include <errno.h>

CAMLprim value ml_nice(value delta)
{
CAMLparam1(delta);
/* see manpage, a successful nice can legitimately return -1. */
errno = 0;
int rc = nice(Int_val(delta));
if (-1 == rc && errno)
uerror("nice", Nothing);
CAMLreturn(Val_int(rc));
}
23 changes: 23 additions & 0 deletions ocaml/libs/http-lib/timers/timer.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
type t

external cpu_timer_create : bool -> t = "ml_cpu_timer_create"

external cpu_timer_destroy : t -> unit = "ml_cpu_timer_destroy"

external cpu_timer_settime : t -> float -> unit = "ml_cpu_timer_settime"

external cpu_timer_gettime : t -> float = "ml_cpu_timer_gettime"

let with_cpu_timer ~is_thread ~interval f =
let t = cpu_timer_create is_thread in
let finally () = cpu_timer_destroy t in
Fun.protect ~finally @@ fun () ->
cpu_timer_settime t interval ;
f t

let default_interval = Atomic.make 0.005

let thread_create f arg =
Thread.create
(with_cpu_timer ~is_thread:true ~interval:(Atomic.get default_interval))
(fun (_ : t) -> f arg)
29 changes: 29 additions & 0 deletions ocaml/libs/http-lib/timers/timer.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
type t

(** [with_cpu_timer ~is_thread ~interval f] creates a POSIX timer with [timer_create],
and sets it to deliver [SIGVTALRM] whenever the process or thread has consumed [interval] seconds of CPU time.
The signal will be redelivered periodically every time the process or thread consumes [interval] more seconds of CPU time.
The timer is disarmed and destroyed when [f] finishes.
This can be used to limit the amount of time a single thread can hold the OCaml master lock.
@param is_thread whether to measure time per thread or per process
@param interval in seconds when to deliver [SIGVTALRM]
@param f the function to call with this timer armed
*)
val with_cpu_timer: is_thread:bool -> interval:float -> (t -> 'a) -> 'a

(** [cpu_timer_settime t interval] changes the [SIGVTALRM] delivery interval to [interval].
A [0.] value disarms the timer.
*)
val cpu_timer_settime: t -> float -> unit

(** [cpu_timer_gettime t] retrieves the current value of the timer [t] in seconds. *)
val cpu_timer_gettime: t -> float

val default_interval: float Atomic.t

val thread_create: ('a -> 'b) -> 'a -> Thread.t
(** [thread_create f arg] is a wrapper for [Thread.create f arg] that sets up an interval timer with {!val:with_cpu_timer},
with a default interval.
*)
134 changes: 134 additions & 0 deletions ocaml/libs/http-lib/timers/timer_stubs.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#include <bits/time.h>
#define CAML_NAME_SPACE
#include <caml/alloc.h>
#include <caml/custom.h>
#include <caml/memory.h>
#include <caml/mlvalues.h>
#include <caml/threads.h>
#include <caml/unixsupport.h>

#include <errno.h>
#include <signal.h>
#include <string.h>
#include <time.h>

#define timer_t_val(v) (*((timer_t **)Data_custom_val(v)))

static int ml_cpu_timer_free(value timer) {
/* Called from finalizer, must not raise exceptions, and must not use
the CAML* macros
*/
timer_t *timerid_ptr = timer_t_val(timer);
if (timerid_ptr) {
timer_t timerid = *timerid_ptr;
/* prevent double-free */
timer_t_val(timer) = NULL;
caml_stat_free(timerid_ptr);

return timer_delete(timerid);
}
/* timer has already been freed, matches what timer_delete would return on bad
* timer_t */
errno = EINVAL;
return -1;
}

void ml_cpu_timer_finalise(value timer) { ml_cpu_timer_free(timer); }

static struct custom_operations timer_ops = {"timer_t",
ml_cpu_timer_finalise,
custom_compare_default,
custom_hash_default,
custom_serialize_default,
custom_deserialize_default,
custom_compare_ext_default,
custom_fixed_length_default};

CAMLprim value ml_cpu_timer_create(value is_thread) {
CAMLparam1(is_thread);
CAMLlocal1(timer);
clockid_t clock =
Bool_val(is_thread) ? CLOCK_THREAD_CPUTIME_ID : CLOCK_PROCESS_CPUTIME_ID;
struct sigevent sev;
timer_t *timerid;

timer = caml_alloc_custom(&timer_ops, sizeof(timer_t *), 0, 1);
/* initialize, in case the allocation below fails */
timer_t_val(timer) = NULL;

timerid = caml_stat_alloc(sizeof(timer_t));

memset(&sev, 0, sizeof(sev));
sev.sigev_notify = SIGEV_SIGNAL;
sev.sigev_signo = SIGVTALRM; /* same as SIGPREEMPTION in OCaml runtime */
sev.sigev_value.sival_ptr = timerid;

caml_enter_blocking_section();
int rc = timer_create(clock, &sev, timerid);
caml_leave_blocking_section();

if (-1 == rc) {
caml_stat_free(timerid);
uerror("timer_create", Nothing);
}
/* only store value once we know it contains a valid timer */
timer_t_val(timer) = timerid;

CAMLreturn(timer);
}

CAMLprim value ml_cpu_timer_destroy(value timer) {
CAMLparam1(timer);
caml_enter_blocking_section();
int rc = ml_cpu_timer_free(timer);
caml_leave_blocking_section();
if (-1 == rc)
uerror("timer_delete", Nothing);
CAMLreturn(Val_unit);
}

CAMLprim value ml_cpu_timer_settime(value timer, value interval) {
CAMLparam2(timer, interval);
timer_t *timerid_ptr = timer_t_val(timer);

if (!timerid_ptr)
unix_error(EINVAL, "timer_settime", Nothing);

struct itimerspec spec;

double t = Double_val(interval);
spec.it_interval.tv_sec = (time_t)t;
spec.it_interval.tv_nsec = (t - spec.it_interval.tv_sec) * 1e9;
spec.it_value = spec.it_interval;

timer_t timerid = *timerid_ptr;

caml_enter_blocking_section();
int rc = timer_settime(timerid, 0, &spec, NULL);
caml_leave_blocking_section();

if (-1 == rc)
uerror("timer_settime", Nothing);

CAMLreturn(Val_unit);
}

CAMLprim value ml_cpu_timer_gettime(value timer) {
CAMLparam1(timer);
timer_t *timerid_ptr = timer_t_val(timer);

if (!timerid_ptr)
unix_error(EINVAL, "timer_gettime", Nothing);

timer_t timerid = *timerid_ptr;
struct itimerspec spec;

caml_enter_blocking_section();
int rc = timer_gettime(timerid, &spec);
caml_leave_blocking_section();
if (-1 == rc)
uerror("timer_gettime", Nothing);

double t = spec.it_value.tv_nsec * 1e-9 + spec.it_value.tv_sec;
CAMLreturn(caml_copy_double(t));
}
3 changes: 2 additions & 1 deletion ocaml/libs/tracing/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(library
(name tracing)
(modules tracing)
(libraries re uri xapi-log xapi-stdext-threads)
(libraries re uri xapi-log xapi-stdext-threads xapi-timers)
(public_name xapi-tracing))

(library
Expand All @@ -15,6 +15,7 @@
ptime.clock.os
rpclib.core
rpclib.json
xapi-timers
tracing
uri
xapi-log
Expand Down
2 changes: 1 addition & 1 deletion ocaml/libs/tracing/tracing.ml
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ module Spans = struct
span_timeout := timeout ;
span_timeout_thread :=
Some
(Thread.create
(Timers.Timer.thread_create
(fun () ->
while true do
debug "Tracing: Span garbage collector" ;
Expand Down
2 changes: 1 addition & 1 deletion ocaml/libs/tracing/tracing_export.ml
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ module Destination = struct

let create_exporter () =
enable_span_garbage_collector () ;
Thread.create
Timers.Timer.thread_create
(fun () ->
let signaled = ref false in
while not !signaled do
Expand Down
2 changes: 1 addition & 1 deletion ocaml/message-switch/core_test/server_unix_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ let name = ref "server"
let process = function
| "shutdown" ->
let (_ : Thread.t) =
Thread.create (fun () -> Thread.delay 1. ; exit 0) ()
Timers.Timer.thread_create (fun () -> Thread.delay 1. ; exit 0) ()
in
"ok"
| x ->
Expand Down
1 change: 1 addition & 0 deletions ocaml/message-switch/unix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
message-switch-core
rpclib.core
rpclib.json
xapi-timers
threads.posix
)
(preprocess (pps ppx_deriving_rpc))
Expand Down
4 changes: 2 additions & 2 deletions ocaml/message-switch/unix/protocol_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ let thread_forever f v =
| exception _ ->
Thread.delay 1.0 ; (loop [@tailcall]) ()
in
Thread.create loop ()
Timers.Timer.thread_create loop ()

module IO = struct
let whoami () =
Expand Down Expand Up @@ -502,7 +502,7 @@ module Server = struct
List.iter
(fun (i, m) ->
let (_ : Thread.t) =
Thread.create
Timers.Timer.thread_create
(fun () ->
let response =
try process m.Message.payload
Expand Down
2 changes: 1 addition & 1 deletion ocaml/message-switch/unix/protocol_unix_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,6 @@ let start =
fun () ->
match !t with
| None ->
t := Some (Thread.create main_loop ())
t := Some (Timers.Timer.thread_create main_loop ())
| Some _ ->
()
Loading

0 comments on commit f4f4570

Please sign in to comment.