diff --git a/doc/content/xapi/storage/sxm.md b/doc/content/xapi/storage/sxm.md index 8429f87321c..dd81ed9060c 100644 --- a/doc/content/xapi/storage/sxm.md +++ b/doc/content/xapi/storage/sxm.md @@ -1702,7 +1702,7 @@ let post_detach_hook ~sr ~vdi ~dp = Opt.iter (fun r -> let remote_url = Http.Url.of_string r.url in let module Remote = Client(struct let rpc = rpc ~srcstr:"smapiv2" ~dststr:"dst_smapiv2" remote_url end) in - let t = Thread.create (fun () -> + let t = Timers.Timer.thread_create (fun () -> debug "Calling receive_finalize"; log_and_ignore_exn (fun () -> Remote.DATA.MIRROR.receive_finalize ~dbg:"Mirror-cleanup" ~id); diff --git a/dune-project b/dune-project index 92d84381b7c..04b46a0ac62 100644 --- a/dune-project +++ b/dune-project @@ -10,6 +10,10 @@ (maintainers "Xapi project maintainers") (homepage "https://xapi-project.github.io/") +(package + (name xapi-timers) +) + (package (name zstd) ) diff --git a/ocaml/database/db_cache_impl.ml b/ocaml/database/db_cache_impl.ml index 7fd12b75547..911526bb79d 100644 --- a/ocaml/database/db_cache_impl.ml +++ b/ocaml/database/db_cache_impl.ml @@ -376,7 +376,7 @@ let spawn_db_flush_threads () = (fun dbconn -> let db_path = dbconn.Parse_db_conf.path in ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> Debug.with_thread_named ("dbflush [" ^ db_path ^ "]") diff --git a/ocaml/database/master_connection.ml b/ocaml/database/master_connection.ml index 01a413a512d..f456e17cbc2 100644 --- a/ocaml/database/master_connection.ml +++ b/ocaml/database/master_connection.ml @@ -125,7 +125,7 @@ let start_master_connection_watchdog () = | None -> my_watchdog := Some - (Thread.create + (Timers.Timer.thread_create (fun () -> while true do try diff --git a/ocaml/database/redo_log.ml b/ocaml/database/redo_log.ml index 429646dcce7..45ad1639f23 100644 --- a/ocaml/database/redo_log.ml +++ b/ocaml/database/redo_log.ml @@ -582,7 +582,7 @@ let shutdown log = Unix.kill ipid Sys.sigkill ; (* Wait for the process to die. This is done in a separate thread in case it does not respond to the signal immediately. *) ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> D.debug "Waiting for I/O process with pid %d to die..." ipid ; with_lock log.dying_processes_mutex (fun () -> diff --git a/ocaml/libs/http-lib/dune b/ocaml/libs/http-lib/dune index 329575297fc..4c5befb4530 100644 --- a/ocaml/libs/http-lib/dune +++ b/ocaml/libs/http-lib/dune @@ -18,6 +18,7 @@ sha stunnel threads.posix + timers uuid xapi-backtrace xapi-consts.xapi_version diff --git a/ocaml/libs/http-lib/server_io.ml b/ocaml/libs/http-lib/server_io.ml index 4a2e51e30e1..68327189706 100644 --- a/ocaml/libs/http-lib/server_io.ml +++ b/ocaml/libs/http-lib/server_io.ml @@ -45,7 +45,7 @@ let handler_by_thread tracer (h : handler) (s : Unix.file_descr) | Error _ -> None (* too early, don't flood logs *) in - Thread.create + Timers.Timer.thread_create (fun () -> let finally () = Xapi_stdext_threads.Semaphore.release h.lock 1 ; @@ -80,7 +80,7 @@ let establish_server ?(signal_fds = []) forker handler sock = try ignore (forker tracer handler s caller) with exc -> (* NB provided 'forker' is configured to make a background thread then the - only way we can get here is if Thread.create fails. + only way we can get here is if Timers.Timer.thread_create fails. This means we haven't executed any code which could close the fd therefore we should do it ourselves. *) debug "Got exception in server_io.ml: %s" (Printexc.to_string exc) ; @@ -117,7 +117,7 @@ let server handler sock = warn "Attempt to double-shutdown Server_io.server detected; ignoring" in let thread = - Thread.create + Timers.Timer.thread_create (fun () -> Debug.with_thread_named handler.name (fun () -> diff --git a/ocaml/libs/http-lib/test_client.ml b/ocaml/libs/http-lib/test_client.ml index 16d9d79cc09..f64e2bc24c5 100644 --- a/ocaml/libs/http-lib/test_client.ml +++ b/ocaml/libs/http-lib/test_client.ml @@ -77,7 +77,7 @@ let per_nsec n f = let threads n f = let results = Array.make n 0 in let body i () = results.(i) <- f () in - let threads = Array.mapi (fun i _ -> Thread.create (body i) ()) results in + let threads = Array.mapi (fun i _ -> Timers.Timer.thread_create (body i) ()) results in Array.iter Thread.join threads ; Array.fold_left ( + ) 0 results diff --git a/ocaml/libs/http-lib/timers/dune b/ocaml/libs/http-lib/timers/dune new file mode 100644 index 00000000000..d835509756a --- /dev/null +++ b/ocaml/libs/http-lib/timers/dune @@ -0,0 +1,9 @@ +(library + (name timers) + (public_name xapi-timers) + (libraries threads.posix) + (foreign_stubs + (language c) + (names nice_stubs timer_stubs) + ) +) diff --git a/ocaml/libs/http-lib/timers/nice.ml b/ocaml/libs/http-lib/timers/nice.ml new file mode 100644 index 00000000000..bba315fac54 --- /dev/null +++ b/ocaml/libs/http-lib/timers/nice.ml @@ -0,0 +1,4 @@ +(** [nice delta] changes the nice value of the current thread on Linux by [delta], and return the new value. + [nice 0] can be used to query the current value without altering it. + *) +external nice: int -> int = "ml_nice" diff --git a/ocaml/libs/http-lib/timers/nice_stubs.c b/ocaml/libs/http-lib/timers/nice_stubs.c new file mode 100644 index 00000000000..0b3fa645fca --- /dev/null +++ b/ocaml/libs/http-lib/timers/nice_stubs.c @@ -0,0 +1,15 @@ +#include +#include +#include +#include + +CAMLprim value ml_nice(value delta) +{ + CAMLparam1(delta); + /* see manpage, a successful nice can legitimately return -1. */ + errno = 0; + int rc = nice(Int_val(delta)); + if (-1 == rc && errno) + uerror("nice", Nothing); + CAMLreturn(Val_int(rc)); +} diff --git a/ocaml/libs/http-lib/timers/timer.ml b/ocaml/libs/http-lib/timers/timer.ml new file mode 100644 index 00000000000..be00933234b --- /dev/null +++ b/ocaml/libs/http-lib/timers/timer.ml @@ -0,0 +1,23 @@ +type t + +external cpu_timer_create : bool -> t = "ml_cpu_timer_create" + +external cpu_timer_destroy : t -> unit = "ml_cpu_timer_destroy" + +external cpu_timer_settime : t -> float -> unit = "ml_cpu_timer_settime" + +external cpu_timer_gettime : t -> float = "ml_cpu_timer_gettime" + +let with_cpu_timer ~is_thread ~interval f = + let t = cpu_timer_create is_thread in + let finally () = cpu_timer_destroy t in + Fun.protect ~finally @@ fun () -> + cpu_timer_settime t interval ; + f t + +let default_interval = Atomic.make 0.005 + +let thread_create f arg = + Thread.create + (with_cpu_timer ~is_thread:true ~interval:(Atomic.get default_interval)) + (fun (_ : t) -> f arg) diff --git a/ocaml/libs/http-lib/timers/timer.mli b/ocaml/libs/http-lib/timers/timer.mli new file mode 100644 index 00000000000..01a9005932d --- /dev/null +++ b/ocaml/libs/http-lib/timers/timer.mli @@ -0,0 +1,29 @@ +type t + +(** [with_cpu_timer ~is_thread ~interval f] creates a POSIX timer with [timer_create], + and sets it to deliver [SIGVTALRM] whenever the process or thread has consumed [interval] seconds of CPU time. + The signal will be redelivered periodically every time the process or thread consumes [interval] more seconds of CPU time. + The timer is disarmed and destroyed when [f] finishes. + + This can be used to limit the amount of time a single thread can hold the OCaml master lock. + + @param is_thread whether to measure time per thread or per process + @param interval in seconds when to deliver [SIGVTALRM] + @param f the function to call with this timer armed + *) +val with_cpu_timer: is_thread:bool -> interval:float -> (t -> 'a) -> 'a + +(** [cpu_timer_settime t interval] changes the [SIGVTALRM] delivery interval to [interval]. + A [0.] value disarms the timer. + *) +val cpu_timer_settime: t -> float -> unit + +(** [cpu_timer_gettime t] retrieves the current value of the timer [t] in seconds. *) +val cpu_timer_gettime: t -> float + +val default_interval: float Atomic.t + +val thread_create: ('a -> 'b) -> 'a -> Thread.t +(** [thread_create f arg] is a wrapper for [Thread.create f arg] that sets up an interval timer with {!val:with_cpu_timer}, + with a default interval. + *) diff --git a/ocaml/libs/http-lib/timers/timer_stubs.c b/ocaml/libs/http-lib/timers/timer_stubs.c new file mode 100644 index 00000000000..4ba65935a85 --- /dev/null +++ b/ocaml/libs/http-lib/timers/timer_stubs.c @@ -0,0 +1,134 @@ +#include +#define CAML_NAME_SPACE +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#define timer_t_val(v) (*((timer_t **)Data_custom_val(v))) + +static int ml_cpu_timer_free(value timer) { + /* Called from finalizer, must not raise exceptions, and must not use + the CAML* macros + */ + timer_t *timerid_ptr = timer_t_val(timer); + if (timerid_ptr) { + timer_t timerid = *timerid_ptr; + /* prevent double-free */ + timer_t_val(timer) = NULL; + caml_stat_free(timerid_ptr); + + return timer_delete(timerid); + } + /* timer has already been freed, matches what timer_delete would return on bad + * timer_t */ + errno = EINVAL; + return -1; +} + +void ml_cpu_timer_finalise(value timer) { ml_cpu_timer_free(timer); } + +static struct custom_operations timer_ops = {"timer_t", + ml_cpu_timer_finalise, + custom_compare_default, + custom_hash_default, + custom_serialize_default, + custom_deserialize_default, + custom_compare_ext_default, + custom_fixed_length_default}; + +CAMLprim value ml_cpu_timer_create(value is_thread) { + CAMLparam1(is_thread); + CAMLlocal1(timer); + clockid_t clock = + Bool_val(is_thread) ? CLOCK_THREAD_CPUTIME_ID : CLOCK_PROCESS_CPUTIME_ID; + struct sigevent sev; + timer_t *timerid; + + timer = caml_alloc_custom(&timer_ops, sizeof(timer_t *), 0, 1); + /* initialize, in case the allocation below fails */ + timer_t_val(timer) = NULL; + + timerid = caml_stat_alloc(sizeof(timer_t)); + + memset(&sev, 0, sizeof(sev)); + sev.sigev_notify = SIGEV_SIGNAL; + sev.sigev_signo = SIGVTALRM; /* same as SIGPREEMPTION in OCaml runtime */ + sev.sigev_value.sival_ptr = timerid; + + caml_enter_blocking_section(); + int rc = timer_create(clock, &sev, timerid); + caml_leave_blocking_section(); + + if (-1 == rc) { + caml_stat_free(timerid); + uerror("timer_create", Nothing); + } + /* only store value once we know it contains a valid timer */ + timer_t_val(timer) = timerid; + + CAMLreturn(timer); +} + +CAMLprim value ml_cpu_timer_destroy(value timer) { + CAMLparam1(timer); + caml_enter_blocking_section(); + int rc = ml_cpu_timer_free(timer); + caml_leave_blocking_section(); + if (-1 == rc) + uerror("timer_delete", Nothing); + CAMLreturn(Val_unit); +} + +CAMLprim value ml_cpu_timer_settime(value timer, value interval) { + CAMLparam2(timer, interval); + timer_t *timerid_ptr = timer_t_val(timer); + + if (!timerid_ptr) + unix_error(EINVAL, "timer_settime", Nothing); + + struct itimerspec spec; + + double t = Double_val(interval); + spec.it_interval.tv_sec = (time_t)t; + spec.it_interval.tv_nsec = (t - spec.it_interval.tv_sec) * 1e9; + spec.it_value = spec.it_interval; + + timer_t timerid = *timerid_ptr; + + caml_enter_blocking_section(); + int rc = timer_settime(timerid, 0, &spec, NULL); + caml_leave_blocking_section(); + + if (-1 == rc) + uerror("timer_settime", Nothing); + + CAMLreturn(Val_unit); +} + +CAMLprim value ml_cpu_timer_gettime(value timer) { + CAMLparam1(timer); + timer_t *timerid_ptr = timer_t_val(timer); + + if (!timerid_ptr) + unix_error(EINVAL, "timer_gettime", Nothing); + + timer_t timerid = *timerid_ptr; + struct itimerspec spec; + + caml_enter_blocking_section(); + int rc = timer_gettime(timerid, &spec); + caml_leave_blocking_section(); + if (-1 == rc) + uerror("timer_gettime", Nothing); + + double t = spec.it_value.tv_nsec * 1e-9 + spec.it_value.tv_sec; + CAMLreturn(caml_copy_double(t)); +} diff --git a/ocaml/libs/tracing/dune b/ocaml/libs/tracing/dune index 0e1160818c2..7e7c001cb18 100644 --- a/ocaml/libs/tracing/dune +++ b/ocaml/libs/tracing/dune @@ -1,7 +1,7 @@ (library (name tracing) (modules tracing) - (libraries re uri xapi-log xapi-stdext-threads) + (libraries re uri xapi-log xapi-stdext-threads xapi-timers) (public_name xapi-tracing)) (library @@ -15,6 +15,7 @@ ptime.clock.os rpclib.core rpclib.json + xapi-timers tracing uri xapi-log diff --git a/ocaml/libs/tracing/tracing.ml b/ocaml/libs/tracing/tracing.ml index bbdd1cf3f4d..733762dcd97 100644 --- a/ocaml/libs/tracing/tracing.ml +++ b/ocaml/libs/tracing/tracing.ml @@ -444,7 +444,7 @@ module Spans = struct span_timeout := timeout ; span_timeout_thread := Some - (Thread.create + (Timers.Timer.thread_create (fun () -> while true do debug "Tracing: Span garbage collector" ; diff --git a/ocaml/libs/tracing/tracing_export.ml b/ocaml/libs/tracing/tracing_export.ml index a769b2403bc..1bb4bd2012e 100644 --- a/ocaml/libs/tracing/tracing_export.ml +++ b/ocaml/libs/tracing/tracing_export.ml @@ -292,7 +292,7 @@ module Destination = struct let create_exporter () = enable_span_garbage_collector () ; - Thread.create + Timers.Timer.thread_create (fun () -> let signaled = ref false in while not !signaled do diff --git a/ocaml/message-switch/core_test/server_unix_main.ml b/ocaml/message-switch/core_test/server_unix_main.ml index 5cddb7874b7..4b022b9bc62 100644 --- a/ocaml/message-switch/core_test/server_unix_main.ml +++ b/ocaml/message-switch/core_test/server_unix_main.ml @@ -21,7 +21,7 @@ let name = ref "server" let process = function | "shutdown" -> let (_ : Thread.t) = - Thread.create (fun () -> Thread.delay 1. ; exit 0) () + Timers.Timer.thread_create (fun () -> Thread.delay 1. ; exit 0) () in "ok" | x -> diff --git a/ocaml/message-switch/unix/dune b/ocaml/message-switch/unix/dune index 54b6c0e77bf..14986ebb2a1 100644 --- a/ocaml/message-switch/unix/dune +++ b/ocaml/message-switch/unix/dune @@ -10,6 +10,7 @@ message-switch-core rpclib.core rpclib.json + xapi-timers threads.posix ) (preprocess (pps ppx_deriving_rpc)) diff --git a/ocaml/message-switch/unix/protocol_unix.ml b/ocaml/message-switch/unix/protocol_unix.ml index 485964a40ec..13e855fcc22 100644 --- a/ocaml/message-switch/unix/protocol_unix.ml +++ b/ocaml/message-switch/unix/protocol_unix.ml @@ -31,7 +31,7 @@ let thread_forever f v = | exception _ -> Thread.delay 1.0 ; (loop [@tailcall]) () in - Thread.create loop () + Timers.Timer.thread_create loop () module IO = struct let whoami () = @@ -502,7 +502,7 @@ module Server = struct List.iter (fun (i, m) -> let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> let response = try process m.Message.payload diff --git a/ocaml/message-switch/unix/protocol_unix_scheduler.ml b/ocaml/message-switch/unix/protocol_unix_scheduler.ml index 92e6cdd3b1b..22194308b0f 100644 --- a/ocaml/message-switch/unix/protocol_unix_scheduler.ml +++ b/ocaml/message-switch/unix/protocol_unix_scheduler.ml @@ -207,6 +207,6 @@ let start = fun () -> match !t with | None -> - t := Some (Thread.create main_loop ()) + t := Some (Timers.Timer.thread_create main_loop ()) | Some _ -> () diff --git a/ocaml/mpathalert/mpathalert.ml b/ocaml/mpathalert/mpathalert.ml index 1ad0daedcec..327e8776342 100644 --- a/ocaml/mpathalert/mpathalert.ml +++ b/ocaml/mpathalert/mpathalert.ml @@ -391,8 +391,8 @@ let _ = in let queue = Queue.create () in let msg = Buffer.create 1024 in - let (t1 : Thread.t) = Thread.create (retry_with_session listener rpc) queue in + let (t1 : Thread.t) = Timers.Timer.thread_create (retry_with_session listener rpc) queue in let (t2 : Thread.t) = - Thread.create (retry_with_session sender rpc) (!delay, msg, queue) + Timers.Timer.thread_create (retry_with_session sender rpc) (!delay, msg, queue) in Thread.join t1 ; Thread.join t2 diff --git a/ocaml/networkd/bin/network_monitor_thread.ml b/ocaml/networkd/bin/network_monitor_thread.ml index 7ec920f329c..b50066a5475 100644 --- a/ocaml/networkd/bin/network_monitor_thread.ml +++ b/ocaml/networkd/bin/network_monitor_thread.ml @@ -436,8 +436,8 @@ let start () = Debug.with_thread_associated dbg (fun () -> debug "Starting network monitor" ; - let (_ : Thread.t) = Thread.create (monitor dbg) () in - let (_ : Thread.t) = Thread.create ip_watcher () in + let (_ : Thread.t) = Timers.Timer.thread_create (monitor dbg) () in + let (_ : Thread.t) = Timers.Timer.thread_create ip_watcher () in () ) () diff --git a/ocaml/networkd/bin/networkd.ml b/ocaml/networkd/bin/networkd.ml index e36113580db..9ac75620ec3 100644 --- a/ocaml/networkd/bin/networkd.ml +++ b/ocaml/networkd/bin/networkd.ml @@ -146,7 +146,7 @@ let start server = Network_monitor_thread.start () ; Network_server.on_startup () ; let (_ : Thread.t) = - Thread.create (fun () -> Xcp_service.serve_forever server) () + Timers.Timer.thread_create (fun () -> Xcp_service.serve_forever server) () in () diff --git a/ocaml/perftest/tests.ml b/ocaml/perftest/tests.ml index 5262d4be0ec..0ca016c9f3c 100644 --- a/ocaml/perftest/tests.ml +++ b/ocaml/perftest/tests.ml @@ -204,7 +204,7 @@ let parallel_with_vms async_op opname n vms rpc session_id test subtest_name = ~description:"" in active_tasks := [control_task] ; - let thread = Thread.create check_active_tasks () in + let thread = Timers.Timer.thread_create check_active_tasks () in while !vms_to_start <> [] do let start_one () = let vm, _, uuid = List.hd !vms_to_start in @@ -337,7 +337,7 @@ let clone num_clones rpc session_id test = (fun (vm, vmr) -> let res : float list ref = ref [] in let clones : API.ref_VM list ref = ref [] in - let t = Thread.create body (vm, vmr, res, clones) in + let t = Timers.Timer.thread_create body (vm, vmr, res, clones) in (t, (res, clones)) ) vms @@ -382,7 +382,7 @@ let clone num_clones rpc session_id test = let threads = List.mapi (fun i clones -> - Thread.create + Timers.Timer.thread_create (fun clones -> List.iteri (fun j clone -> diff --git a/ocaml/quicktest/quicktest_event.ml b/ocaml/quicktest/quicktest_event.ml index f844db3e72c..c48af5849b9 100644 --- a/ocaml/quicktest/quicktest_event.ml +++ b/ocaml/quicktest/quicktest_event.ml @@ -11,7 +11,7 @@ let event_next_unblocking_test rpc _ () = let m = Mutex.create () in let unblocked = ref false in let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> ( try ignore (Client.Client.Event.next ~rpc ~session_id) with e -> @@ -47,7 +47,7 @@ let event_next_test rpc session_id () = with _ -> () ) ; let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> while not (with_lock m (fun () -> !finished)) do ignore (Client.Client.Event.next ~rpc ~session_id) ; @@ -97,7 +97,7 @@ let event_from_test rpc session_id () = with _ -> () ) ; let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> wait_for_pool_key rpc session_id key ; with_lock m (fun () -> finished := true) @@ -122,7 +122,7 @@ let event_from_parallel_test rpc session_id () = ) ; let ok = ref true in let (i_should_succeed : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> try let _ = @@ -138,7 +138,7 @@ let event_from_parallel_test rpc session_id () = () in let (interfering_thread : Thread.t) = - Thread.create (fun () -> wait_for_pool_key rpc session_id key) () + Timers.Timer.thread_create (fun () -> wait_for_pool_key rpc session_id key) () in Thread.delay 1. ; (* wait for both threads to block in Event.from *) @@ -169,7 +169,7 @@ let object_level_event_test rpc session_id () = with _ -> () ) ; let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> let token = ref "" in while not (with_lock m (fun () -> !finished)) do @@ -398,7 +398,7 @@ let event_inject_test rpc session_id () = let pool = List.hd (Client.Client.Pool.get_all ~rpc ~session_id) in let starttime = Unix.gettimeofday () in let (x : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> let _ = Client.Client.Event.from ~rpc ~session_id ~classes:["pool"] ~token diff --git a/ocaml/squeezed/src/memory_server.ml b/ocaml/squeezed/src/memory_server.ml index a667199c74f..59c09c524a4 100644 --- a/ocaml/squeezed/src/memory_server.ml +++ b/ocaml/squeezed/src/memory_server.ml @@ -78,7 +78,7 @@ let start_balance_thread balance_check_interval = done ) in - let (_ : Thread.t) = Thread.create body () in + let (_ : Thread.t) = Timers.Timer.thread_create body () in () let get_diagnostics _dbg = "diagnostics not yet available" diff --git a/ocaml/squeezed/src/squeeze_xen.ml b/ocaml/squeezed/src/squeeze_xen.ml index f4ba7e5accd..ab4bef764c7 100644 --- a/ocaml/squeezed/src/squeeze_xen.ml +++ b/ocaml/squeezed/src/squeeze_xen.ml @@ -335,7 +335,7 @@ module Domain = struct let start_watch_xenstore_thread () = let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> while true do try watch_xenstore () diff --git a/ocaml/squeezed/src/squeezed.ml b/ocaml/squeezed/src/squeezed.ml index d224a8eb8e0..0e4c4df13c9 100644 --- a/ocaml/squeezed/src/squeezed.ml +++ b/ocaml/squeezed/src/squeezed.ml @@ -115,8 +115,8 @@ let _ = maybe_daemonize () ; (* NB Initialise the xenstore connection after daemonising, otherwise we lose our connection *) - let _ = Thread.create Memory_server.record_boot_time_host_free_memory () in - let rpc_server = Thread.create Xcp_service.serve_forever server in + let _ = Timers.Timer.thread_create Memory_server.record_boot_time_host_free_memory () in + let rpc_server = Timers.Timer.thread_create Xcp_service.serve_forever server in Memory_server.start_balance_thread balance_check_interval ; Squeeze_xen.Domain.start_watch_xenstore_thread () ; if !Squeeze.manage_domain_zero then Squeeze_xen.configure_domain_zero () ; diff --git a/ocaml/tests/common/test_event_common.ml b/ocaml/tests/common/test_event_common.ml index 149a27d5ea8..57a72d75512 100644 --- a/ocaml/tests/common/test_event_common.ml +++ b/ocaml/tests/common/test_event_common.ml @@ -11,7 +11,7 @@ let start_periodic_scheduler () = (Xapi_periodic_scheduler.Periodic 60.0) 0.0 (fun () -> () ) ; Xapi_event.register_hooks () ; - ignore (Thread.create Xapi_periodic_scheduler.loop ()) ; + ignore (Timers.Timer.thread_create Xapi_periodic_scheduler.loop ()) ; ps_start := true ) ; Mutex.unlock scheduler_mutex diff --git a/ocaml/tests/test_clustering.ml b/ocaml/tests/test_clustering.ml index 05980045a11..7d90868783b 100644 --- a/ocaml/tests/test_clustering.ml +++ b/ocaml/tests/test_clustering.ml @@ -382,7 +382,7 @@ let nest_with_clustering_lock_if_needed ~__context ~timeout ~type1 ~type2 ~on_deadlock ~on_no_deadlock = let success = ref false in let _ = - Thread.create + Timers.Timer.thread_create (fun () -> Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:type1 __LOC__ (fun () -> diff --git a/ocaml/tests/test_daemon_manager.ml b/ocaml/tests/test_daemon_manager.ml index 14b3cd0d9eb..fdbaa8b616f 100644 --- a/ocaml/tests/test_daemon_manager.ml +++ b/ocaml/tests/test_daemon_manager.ml @@ -47,7 +47,7 @@ module Mock_daemon = struct (* Raise the exception after spawning a thread which will set running to false after a specified time. *) let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> Thread.delay time_until_stopped ; running := false @@ -103,7 +103,7 @@ let test_exception () = let spawn_threads_and_wait task count = let rec spawn_threads task count acc = if count > 0 then - let thread = Thread.create task () in + let thread = Timers.Timer.thread_create task () in spawn_threads task (count - 1) (thread :: acc) else acc diff --git a/ocaml/tests/test_db_lowlevel.ml b/ocaml/tests/test_db_lowlevel.ml index 7745f8e7cdc..926c4cb2ebd 100644 --- a/ocaml/tests/test_db_lowlevel.ml +++ b/ocaml/tests/test_db_lowlevel.ml @@ -24,7 +24,7 @@ let test_db_get_all_records_race () = Db_cache_impl.fist_delay_read_records_where := true ; (* Kick off the thread which will destroy a VM. *) let destroyer_thread = - Thread.create (fun self -> Db.VM.destroy ~__context ~self) vm_ref + Timers.Timer.thread_create (fun self -> Db.VM.destroy ~__context ~self) vm_ref in (* Call get_all_records *) let _ = diff --git a/ocaml/tests/test_event.ml b/ocaml/tests/test_event.ml index 9078244462b..2ee6ffc7dc8 100644 --- a/ocaml/tests/test_event.ml +++ b/ocaml/tests/test_event.ml @@ -91,7 +91,7 @@ let event_next_unblock () = (* no events *) let wait_hdl = Delay.make () in let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> ( try ignore (Xapi_event.next ~__context) with e -> @@ -123,7 +123,7 @@ let event_next_test () = with _ -> () ) ; let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> let finished = ref false in while not !finished do @@ -166,7 +166,7 @@ let event_from_test () = with _ -> () ) ; let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> wait_for_pool_key __context key ; Delay.signal wait_hdl @@ -187,7 +187,7 @@ let event_from_parallel_test () = ) ; let ok = ref true in let (i_should_succeed : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> try let _ = @@ -203,7 +203,7 @@ let event_from_parallel_test () = () in let (interfering_thread : Thread.t) = - Thread.create (fun () -> wait_for_pool_key __context key) () + Timers.Timer.thread_create (fun () -> wait_for_pool_key __context key) () in Thread.delay 0.5 ; (* wait for both threads to block in Event.from *) @@ -226,7 +226,7 @@ let object_level_event_test _session_id = (try Db.VM.remove_from_other_config ~__context ~self:vm_a ~key with _ -> ()) ; (try Db.VM.remove_from_other_config ~__context ~self:vm_b ~key with _ -> ()) ; let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> let token = ref "" in while not !finished do diff --git a/ocaml/tests/test_network_event_loop.ml b/ocaml/tests/test_network_event_loop.ml index c14a7ea83af..a36a3c3edb6 100644 --- a/ocaml/tests/test_network_event_loop.ml +++ b/ocaml/tests/test_network_event_loop.ml @@ -31,7 +31,7 @@ let test_network_event_loop ~no_nbd_networks_at_start () = (* We simulate failure of the firewall update script this way *) let fail_firewall_update = ref false in let start_event_loop = - Thread.create (fun () -> + Timers.Timer.thread_create (fun () -> Network_event_loop._watch_networks_for_nbd_changes __context ~update_firewall:(fun pifs -> if !fail_firewall_update then ( diff --git a/ocaml/tests/test_vdi_cbt.ml b/ocaml/tests/test_vdi_cbt.ml index 566fa18fbf5..553613ad1ca 100644 --- a/ocaml/tests/test_vdi_cbt.ml +++ b/ocaml/tests/test_vdi_cbt.ml @@ -547,7 +547,7 @@ let test_data_destroy = (Db.VDI.get_type ~__context ~self:vdi) in let test_data_destroy_timing = - let bg f = Thread.create f () in + let bg f = Timers.Timer.thread_create f () in (* Creates a VBD that is currently_attached to our VDI *) let setup_test () = let __context, _, vDI = setup_test_for_data_destroy () in diff --git a/ocaml/tests/test_xapi_xenops.ml b/ocaml/tests/test_xapi_xenops.ml index 85d8cd26794..c7f63b96fae 100644 --- a/ocaml/tests/test_xapi_xenops.ml +++ b/ocaml/tests/test_xapi_xenops.ml @@ -37,7 +37,7 @@ let test_xapi_restart_inner () = let cancel, th = let cancel = ref false in let th = - Thread.create + Timers.Timer.thread_create (fun () -> try Xapi_xenops.events_watch ~__context cancel "simulator" None with | Api_errors.Server_error (x, []) when x = Api_errors.task_cancelled @@ -151,7 +151,7 @@ let test_xapi_restart_inner () = let cancel, th = let cancel = ref false in let th = - Thread.create + Timers.Timer.thread_create (fun () -> try Xapi_xenops.events_watch ~__context cancel "simulator" None with | Api_errors.Server_error (x, []) when x = Api_errors.task_cancelled diff --git a/ocaml/xapi-cli-server/cli_util.ml b/ocaml/xapi-cli-server/cli_util.ml index 86e3401b57a..7eb91d14574 100644 --- a/ocaml/xapi-cli-server/cli_util.ml +++ b/ocaml/xapi-cli-server/cli_util.ml @@ -143,7 +143,7 @@ let track_http_operation ?use_existing_task ?(progress_bar = false) fd rpc marshal fd (Command (make_command task_id)) ; let response = ref (Response Wait) in let receive_heartbeats = - Thread.create + Timers.Timer.thread_create (fun () -> while !response = Response Wait do response := unmarshal fd diff --git a/ocaml/xapi-idl/lib/coverage/enabled.ml b/ocaml/xapi-idl/lib/coverage/enabled.ml index ac128055d75..db8709eab5a 100644 --- a/ocaml/xapi-idl/lib/coverage/enabled.ml +++ b/ocaml/xapi-idl/lib/coverage/enabled.ml @@ -56,7 +56,7 @@ module Bisect = struct init_env name ; let queue_name = prefix ^ "." ^ name in let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (Message_switch_unix.Protocol_unix.Server.listen ~process ~switch:!Xcp_client.switch_path ~queue:queue_name ) @@ -116,7 +116,7 @@ module Dispatcher = struct let init () = (* receives command and dispatches to all other coverage message queues *) let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (Message_switch_unix.Protocol_unix.Server.listen ~process ~switch:!Xcp_client.switch_path ~queue:self ) diff --git a/ocaml/xapi-idl/lib/posix_channel.ml b/ocaml/xapi-idl/lib/posix_channel.ml index 06708561011..63560a8047d 100644 --- a/ocaml/xapi-idl/lib/posix_channel.ml +++ b/ocaml/xapi-idl/lib/posix_channel.ml @@ -140,7 +140,7 @@ let send proxy_socket = let proxy_socket = Unix.dup proxy_socket in to_close := proxy_socket :: !to_close ; let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun (fds, paths) -> (* The thread takes over management of the listening sockets *) let to_close = ref fds in diff --git a/ocaml/xapi-idl/lib/scheduler.ml b/ocaml/xapi-idl/lib/scheduler.ml index 407120c9fc6..6c21be59f26 100644 --- a/ocaml/xapi-idl/lib/scheduler.ml +++ b/ocaml/xapi-idl/lib/scheduler.ml @@ -179,7 +179,7 @@ let make_scheduler () = ; m= Mutex.create () } in - let (_ : Thread.t) = Thread.create main_loop s in + let (_ : Thread.t) = Timers.Timer.thread_create main_loop s in s let make = make_scheduler diff --git a/ocaml/xapi-idl/lib/xcp_service.ml b/ocaml/xapi-idl/lib/xcp_service.ml index 123acd4a249..b831dac360e 100644 --- a/ocaml/xapi-idl/lib/xcp_service.ml +++ b/ocaml/xapi-idl/lib/xcp_service.ml @@ -613,7 +613,7 @@ let serve_forever = function while true do let this_connection, _ = Unix.accept listening_sock in let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> finally (fun () -> fn this_connection) diff --git a/ocaml/xapi-idl/lib_test/scheduler_test.ml b/ocaml/xapi-idl/lib_test/scheduler_test.ml index 640ae938862..8135ae0d37c 100644 --- a/ocaml/xapi-idl/lib_test/scheduler_test.ml +++ b/ocaml/xapi-idl/lib_test/scheduler_test.ml @@ -18,7 +18,7 @@ let test_delay_cancel () = let open Scheduler.Delay in let x = make () in let before = Unix.gettimeofday () in - let th = Thread.create (fun () -> wait x 0.5) () in + let th = Timers.Timer.thread_create (fun () -> wait x 0.5) () in signal x ; Thread.join th ; let after = Unix.gettimeofday () in diff --git a/ocaml/xapi-idl/lib_test/task_server_test.ml b/ocaml/xapi-idl/lib_test/task_server_test.ml index a278784979d..9ee0b188edd 100644 --- a/ocaml/xapi-idl/lib_test/task_server_test.ml +++ b/ocaml/xapi-idl/lib_test/task_server_test.ml @@ -218,7 +218,7 @@ let test_with_cancel2 () = ) in let id = T.id_of_handle task in - let th = Thread.create (fun () -> T.run task) () in + let th = Timers.Timer.thread_create (fun () -> T.run task) () in Thread.delay 0.01 ; T.cancel task ; Scheduler.Delay.signal delay ; @@ -253,7 +253,7 @@ let test_with_cancel_failure2 () = ) in let id = T.id_of_handle task in - let th = Thread.create (fun () -> T.run task) () in + let th = Timers.Timer.thread_create (fun () -> T.run task) () in Thread.delay 0.01 ; T.cancel task ; Scheduler.Delay.signal delay ; diff --git a/ocaml/xapi-idl/lib_test/updates_test.ml b/ocaml/xapi-idl/lib_test/updates_test.ml index c9604b35b52..3666d688dc3 100644 --- a/ocaml/xapi-idl/lib_test/updates_test.ml +++ b/ocaml/xapi-idl/lib_test/updates_test.ml @@ -63,7 +63,7 @@ let test_add_after_get () = let ok = ref false in let before = Unix.gettimeofday () in let th = - Thread.create + Timers.Timer.thread_create (fun () -> let _, updates, _ = M.get "dbg" None (Some 0) u in ok := diff --git a/ocaml/xapi/at_least_once_more.ml b/ocaml/xapi/at_least_once_more.ml index 084cd4c602c..0c17062ee7e 100644 --- a/ocaml/xapi/at_least_once_more.ml +++ b/ocaml/xapi/at_least_once_more.ml @@ -47,7 +47,7 @@ let again (x : manager) = x.in_progress <- true ; x.needs_doing_again <- false ; let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> (* Always do the operation immediately: thread is only created when work needs doing *) x.f () ; diff --git a/ocaml/xapi/db_gc.ml b/ocaml/xapi/db_gc.ml index e0d79a5bc8e..e3fd498baa6 100644 --- a/ocaml/xapi/db_gc.ml +++ b/ocaml/xapi/db_gc.ml @@ -288,7 +288,7 @@ let single_pass () = ) let start_db_gc_thread () = - Thread.create + Timers.Timer.thread_create (fun () -> Debug.with_thread_named "db_gc" (fun () -> diff --git a/ocaml/xapi/dune b/ocaml/xapi/dune index fe161e0dd5f..ab81465e6d1 100644 --- a/ocaml/xapi/dune +++ b/ocaml/xapi/dune @@ -59,6 +59,7 @@ (wrapped false) (modules (:standard \ xapi_main)) (libraries + timers angstrom astring cstruct diff --git a/ocaml/xapi/export.ml b/ocaml/xapi/export.ml index c549fb74295..f4128846b21 100644 --- a/ocaml/xapi/export.ml +++ b/ocaml/xapi/export.ml @@ -787,7 +787,7 @@ let metadata_handler (req : Request.t) s _ = let read_fd, write_fd = Unix.pipe () in let export_error = ref None in let writer_thread = - Thread.create + Timers.Timer.thread_create (Debug.with_thread_named "metadata export writer thread" (fun () -> try (* lock all the VMs before exporting their metadata *) diff --git a/ocaml/xapi/helpers.ml b/ocaml/xapi/helpers.ml index f2c68debf6b..687cc89bdce 100644 --- a/ocaml/xapi/helpers.ml +++ b/ocaml/xapi/helpers.ml @@ -1466,7 +1466,7 @@ let run_in_parallel ~funs ~capacity = let run f = let result = ref `Not_started in let wrapper r = try r := `Succ (f ()) with e -> r := `Fail e in - let th = Thread.create wrapper result in + let th = Timers.Timer.thread_create wrapper result in (th, result) in let get_result (th, result) = diff --git a/ocaml/xapi/import.ml b/ocaml/xapi/import.ml index 01e5ca25640..9f0f5ad5db2 100644 --- a/ocaml/xapi/import.ml +++ b/ocaml/xapi/import.ml @@ -2141,7 +2141,7 @@ let with_open_archive fd ?length f = ) in let pipe_out, pipe_in = Unix.pipe () in - let feeder_t = Thread.create feeder pipe_in in + let feeder_t = Timers.Timer.thread_create feeder pipe_in in consumer pipe_out feeder_t (** Remove "import" from the current operations of all created VMs, complete the diff --git a/ocaml/xapi/redo_log_alert.ml b/ocaml/xapi/redo_log_alert.ml index 02ae81f62ef..a346fe4c371 100644 --- a/ocaml/xapi/redo_log_alert.ml +++ b/ocaml/xapi/redo_log_alert.ml @@ -19,7 +19,7 @@ open R let raise_system_alert (name, priority) body = (* This code may block indefinitely while attempting to look up the pool UUID and send the alert, so do it in a separate thread *) ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> debug "Processing redo log event: %s" name ; let __context = Context.make "context" in diff --git a/ocaml/xapi/remote_requests.ml b/ocaml/xapi/remote_requests.ml index 87988f2f3a9..5e1fc656ee9 100644 --- a/ocaml/xapi/remote_requests.ml +++ b/ocaml/xapi/remote_requests.ml @@ -150,7 +150,7 @@ let handle_requests () = done let start_watcher __context timeout delay req = - ignore (Thread.create watcher_thread (__context, timeout, delay, req)) + ignore (Timers.Timer.thread_create watcher_thread (__context, timeout, delay, req)) let queue_request req = with_lock request_mutex (fun () -> diff --git a/ocaml/xapi/server_helpers.ml b/ocaml/xapi/server_helpers.ml index ad76e38e531..5cb09ec5787 100644 --- a/ocaml/xapi/server_helpers.ml +++ b/ocaml/xapi/server_helpers.ml @@ -140,7 +140,7 @@ let do_dispatch ?session_id ?forward_op ?self:_ supports_async called_fn_name let async ~need_complete = (* Fork thread in which to execute async call *) ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> exec_with_context ~__context ~need_complete ~called_async ?f_forward:forward_op ~marshaller op_fn diff --git a/ocaml/xapi/sparse_dd_wrapper.ml b/ocaml/xapi/sparse_dd_wrapper.ml index c2a0f2112a7..fc5d4fc7bd7 100644 --- a/ocaml/xapi/sparse_dd_wrapper.ml +++ b/ocaml/xapi/sparse_dd_wrapper.ml @@ -204,7 +204,7 @@ let start ?(progress_cb = fun _ -> ()) ?base ~verify_cert prezeroed infile with_lock m (fun () -> Condition.broadcast c) in let _ = - Thread.create + Timers.Timer.thread_create (fun () -> dd_internal thread_progress_cb base prezeroed verify_cert infile outfile size diff --git a/ocaml/xapi/startup.ml b/ocaml/xapi/startup.ml index a230fc680db..95d0aa87fb0 100644 --- a/ocaml/xapi/startup.ml +++ b/ocaml/xapi/startup.ml @@ -82,7 +82,7 @@ let run ~__context tasks = if onthread then ( debug "task [starting thread %s]" tsk_name ; ignore - (Thread.create + (Timers.Timer.thread_create (fun tsk_fct -> Server_helpers.exec_with_subtask ~__context tsk_name (fun ~__context -> thread_exn_wrapper tsk_name tsk_fct diff --git a/ocaml/xapi/storage_access.ml b/ocaml/xapi/storage_access.ml index 292c96b4f52..e1db60a4dc0 100644 --- a/ocaml/xapi/storage_access.ml +++ b/ocaml/xapi/storage_access.ml @@ -66,7 +66,7 @@ let start_smapiv1_servers () = let module S = Storage_smapiv1_wrapper.Server in let s = Xcp_service.make ~path ~queue_name ~rpc_fn:S.process () in let (_ : Thread.t) = - Thread.create (fun () -> Xcp_service.serve_forever s) () + Timers.Timer.thread_create (fun () -> Xcp_service.serve_forever s) () in () ) @@ -505,7 +505,7 @@ let rec events_watch ~__context from = let events_from_sm () = ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> Server_helpers.exec_with_new_task "sm_events" (fun __context -> while true do @@ -526,7 +526,7 @@ let start () = in info "Started service on org.xen.xapi.storage" ; let (_ : Thread.t) = - Thread.create (fun () -> Xcp_service.serve_forever s) () + Timers.Timer.thread_create (fun () -> Xcp_service.serve_forever s) () in () diff --git a/ocaml/xapi/storage_migrate.ml b/ocaml/xapi/storage_migrate.ml index fa306e42b61..e157ddb41f2 100644 --- a/ocaml/xapi/storage_migrate.ml +++ b/ocaml/xapi/storage_migrate.ml @@ -1185,7 +1185,7 @@ let post_detach_hook ~sr ~vdi ~dp:_ = remote_url end)) in let t = - Thread.create + Timers.Timer.thread_create (fun () -> debug "Calling receive_finalize" ; log_and_ignore_exn (fun () -> @@ -1349,7 +1349,7 @@ let with_task_and_thread ~dbg f = ) in let _ = - Thread.create + Timers.Timer.thread_create (fun () -> Storage_task.run task ; signal (Storage_task.id_of_handle task) diff --git a/ocaml/xapi/thread_queue.ml b/ocaml/xapi/thread_queue.ml index 227b93edf98..04ef92ade8c 100644 --- a/ocaml/xapi/thread_queue.ml +++ b/ocaml/xapi/thread_queue.ml @@ -78,7 +78,7 @@ let make ?max_q_length ?(name = "unknown") (process_fn : 'a process_fn) : 'a t = | Some _ -> () | None -> - t := Some (Thread.create thread_body ()) + t := Some (Timers.Timer.thread_create thread_body ()) in let push description x = with_lock m (fun () -> diff --git a/ocaml/xapi/xapi.ml b/ocaml/xapi/xapi.ml index ac65142e0c6..f3418c82544 100644 --- a/ocaml/xapi/xapi.ml +++ b/ocaml/xapi/xapi.ml @@ -347,7 +347,7 @@ let server_run_in_emergency_mode () = info "Will restart management software in %.1f seconds" emergency_reboot_delay ; (* in emergency mode we reboot to try reconnecting every "emergency_reboot_timer" period *) let (* reboot_thread *) _ = - Thread.create + Timers.Timer.thread_create (fun () -> Thread.delay emergency_reboot_delay ; exit Xapi_globs.restart_return_code @@ -378,7 +378,7 @@ let bring_up_management_if ~__context () = debug "Management IP address is: %s" ip ; (* Make sure everyone is up to speed *) ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> Server_helpers.exec_with_new_task "dom0 networking update" ~subtask_of:(Context.get_task_id __context) (fun __context -> @@ -991,7 +991,7 @@ let server_init () = let last_error = ref None in (* watchdog to indicate that on_xapi_initialize wasn't successful after 2 min initializing *) let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> Thread.delay (2.0 *. 60.0) ; (* wait 2min before testing for success *) @@ -1326,7 +1326,7 @@ let server_init () = , [] , fun () -> if not !noevents then - ignore (Thread.create Xapi_xenops.events_from_xapi ()) + ignore (Timers.Timer.thread_create Xapi_xenops.events_from_xapi ()) ) ; ( "watching networks for NBD-related changes" , [Startup.OnThread] diff --git a/ocaml/xapi/xapi_clustering.ml b/ocaml/xapi/xapi_clustering.ml index f9a78fef05f..a47791cc1c4 100644 --- a/ocaml/xapi/xapi_clustering.ml +++ b/ocaml/xapi/xapi_clustering.ml @@ -527,7 +527,7 @@ let create_cluster_watcher_on_master ~__context ~host = in if Xapi_cluster_helpers.cluster_health_enabled ~__context then ( debug "%s: create watcher for corosync-notifyd on master" __FUNCTION__ ; - ignore @@ Thread.create watch () + ignore @@ Timers.Timer.thread_create watch () ) else debug "%s: not creating watcher for corosync-notifyd: feature cluster_health \ diff --git a/ocaml/xapi/xapi_fuse.ml b/ocaml/xapi/xapi_fuse.ml index bb318848fbd..e77b1bc9ed0 100644 --- a/ocaml/xapi/xapi_fuse.ml +++ b/ocaml/xapi/xapi_fuse.ml @@ -53,7 +53,7 @@ let light_fuse_and_run ?(fuse_length = !Constants.fuse_time) () = let new_fuse_length = max 5. (fuse_length -. delay_so_far) in debug "light_fuse_and_run: current RRDs have been saved" ; ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> Thread.delay new_fuse_length ; debug "light_fuse_and_run: calling flush and exit" ; @@ -83,7 +83,7 @@ let light_fuse_and_run ?(fuse_length = !Constants.fuse_time) () = let light_fuse_and_reboot_after_eject () = once `Reboot @@ fun () -> ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> Thread.delay !Constants.fuse_time ; (* this activates firstboot script and reboots the host *) @@ -99,7 +99,7 @@ let light_fuse_and_reboot_after_eject () = let light_fuse_and_reboot ?(fuse_length = !Constants.fuse_time) () = once `Reboot @@ fun () -> ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> Thread.delay fuse_length ; ignore (Sys.command "shutdown -r now") @@ -110,7 +110,7 @@ let light_fuse_and_reboot ?(fuse_length = !Constants.fuse_time) () = let light_fuse_and_dont_restart ?(fuse_length = !Constants.fuse_time) () = once `Exit @@ fun () -> ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> debug "light_fuse_and_dont_restart: calling Rrdd.backup_rrds to save \ diff --git a/ocaml/xapi/xapi_ha.ml b/ocaml/xapi/xapi_ha.ml index ee8253e17b6..9dcf795a781 100644 --- a/ocaml/xapi/xapi_ha.ml +++ b/ocaml/xapi/xapi_ha.ml @@ -876,7 +876,7 @@ module Monitor = struct | None -> (* This will cause the started thread to block until signal_database_state_valid is called *) request_shutdown := false ; - thread := Some (Thread.create ha_monitor ()) + thread := Some (Timers.Timer.thread_create ha_monitor ()) ) let signal_database_state_valid () = diff --git a/ocaml/xapi/xapi_host.ml b/ocaml/xapi/xapi_host.ml index 74590bc4fe6..fd86d78a08a 100644 --- a/ocaml/xapi/xapi_host.ml +++ b/ocaml/xapi/xapi_host.ml @@ -772,7 +772,7 @@ let restart_agent ~__context ~host:_ = * successfully before its stunnel connection being terminated by the restarting. *) ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> Thread.delay 1. ; let syslog_stdout = Forkhelpers.Syslog_WithKey "Host.restart_agent" in @@ -882,7 +882,7 @@ let shutdown_and_reboot_common ~__context ~host label description operation cmd (* Do the shutdown in a background thread with a delay to give this API call a reasonable chance of succeeding. *) ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> Thread.delay 10. ; ignore (Sys.command cmd) diff --git a/ocaml/xapi/xapi_host_helpers.ml b/ocaml/xapi/xapi_host_helpers.ml index 00f01d83ed2..6e1a76a6a69 100644 --- a/ocaml/xapi/xapi_host_helpers.ml +++ b/ocaml/xapi/xapi_host_helpers.ml @@ -605,7 +605,7 @@ module Configuration = struct loop let start_watcher_thread ~__context = - Thread.create + Timers.Timer.thread_create (fun () -> let loop = watch_other_configs ~__context 30.0 in while true do diff --git a/ocaml/xapi/xapi_pool_transition.ml b/ocaml/xapi/xapi_pool_transition.ml index a8f00deaa26..c36b760c874 100644 --- a/ocaml/xapi/xapi_pool_transition.ml +++ b/ocaml/xapi/xapi_pool_transition.ml @@ -189,7 +189,7 @@ let attempt_two_phase_commit_of_new_master ~__context (manual : bool) (* If manual, periodicly access to the database to check whether the old master has restarted. *) if manual then let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> try while true do diff --git a/ocaml/xapi/xapi_pusb.ml b/ocaml/xapi/xapi_pusb.ml index da34329cc4f..aaa7026c5d0 100644 --- a/ocaml/xapi/xapi_pusb.ml +++ b/ocaml/xapi/xapi_pusb.ml @@ -99,7 +99,7 @@ let scan_required : bool ref = ref false let start_thread f = ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> while true do with_lock mutex (fun () -> diff --git a/ocaml/xapi/xapi_sr.ml b/ocaml/xapi/xapi_sr.ml index 0508f5384c5..10199182127 100644 --- a/ocaml/xapi/xapi_sr.ml +++ b/ocaml/xapi/xapi_sr.ml @@ -72,7 +72,7 @@ let scan_one ~__context ?callback sr = let sr_uuid = Db.SR.get_uuid ~__context ~self:sr in if i_should_scan_sr sr then ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> Server_helpers.exec_with_subtask ~__context "scan one" (fun ~__context -> @@ -114,7 +114,7 @@ let scan_one ~__context ?callback sr = Option.iter (fun f -> ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> debug "Tried to scan SR %s but scan already in progress - waiting \ diff --git a/ocaml/xapi/xapi_sr_operations.ml b/ocaml/xapi/xapi_sr_operations.ml index 8f7a7d8012a..9cb688a0a66 100644 --- a/ocaml/xapi/xapi_sr_operations.ml +++ b/ocaml/xapi/xapi_sr_operations.ml @@ -280,7 +280,7 @@ let sr_health_check ~__context ~self = in Xapi_host_helpers.update_allowed_operations_all_hosts ~__context ; let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> let rec loop () = Thread.delay 30. ; diff --git a/ocaml/xapi/xapi_stats.ml b/ocaml/xapi/xapi_stats.ml index 2c94ca64974..859d8efae7a 100644 --- a/ocaml/xapi/xapi_stats.ml +++ b/ocaml/xapi/xapi_stats.ml @@ -157,7 +157,7 @@ let start () = let reporter = let reporter = Reporter.make () in let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> Reporter_local.start_local (module D : Debug.DEBUG) diff --git a/ocaml/xapi/xapi_vdi_helpers.ml b/ocaml/xapi/xapi_vdi_helpers.ml index 6b4366a80ce..527b2de4441 100644 --- a/ocaml/xapi/xapi_vdi_helpers.ml +++ b/ocaml/xapi/xapi_vdi_helpers.ml @@ -110,7 +110,7 @@ let enable_database_replication ~__context ~get_vdi_callback = Some (fun new_state -> ignore - (Thread.create + (Timers.Timer.thread_create (fun () -> Db.VDI.set_metadata_latest ~__context ~self:vdi ~value:new_state diff --git a/ocaml/xapi/xapi_vm.ml b/ocaml/xapi/xapi_vm.ml index e0de06045e1..edcd00adbf8 100644 --- a/ocaml/xapi/xapi_vm.ml +++ b/ocaml/xapi/xapi_vm.ml @@ -1384,9 +1384,9 @@ let set_suspend_VDI ~__context ~self ~value = with e -> result := `Fail e in let src_result = ref `Pending in - let src_thread = Thread.create (do_checksum src_vdi) src_result in + let src_thread = Timers.Timer.thread_create (do_checksum src_vdi) src_result in let dst_result = ref `Pending in - let dst_thread = Thread.create (do_checksum dst_vdi) dst_result in + let dst_thread = Timers.Timer.thread_create (do_checksum dst_vdi) dst_result in let get_result t r = Thread.join t ; match !r with diff --git a/ocaml/xapi/xapi_xenops.ml b/ocaml/xapi/xapi_xenops.ml index ac92853d104..4ddebd741e2 100644 --- a/ocaml/xapi/xapi_xenops.ml +++ b/ocaml/xapi/xapi_xenops.ml @@ -2096,7 +2096,7 @@ let update_vm ~__context id = Option.iter (fun pbd -> let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> (* Don't block the database update thread *) Xapi_pbd.plug ~__context ~self:pbd @@ -3163,7 +3163,7 @@ let on_xapi_restart ~__context = events on everything xenopsd knows about, hence a refresh of all VMs. *) List.iter (fun queue_name -> - let (_ : Thread.t) = Thread.create events_from_xenopsd queue_name in + let (_ : Thread.t) = Timers.Timer.thread_create events_from_xenopsd queue_name in () ) (all_known_xenopsds ()) ; diff --git a/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml b/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml index 3eb708d7a5c..6ccc4d08b5b 100644 --- a/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml +++ b/ocaml/xcp-rrdd/bin/rrdd/xcp_rrdd.ml @@ -53,12 +53,12 @@ let xmlrpc_handler process req bio context = (* A helper function for processing HTTP requests on a socket. *) let accept_forever sock f = ignore - (Thread.create + (Timers.Timer.thread_create (fun _ -> while true do let this_connection, _ = Unix.accept sock in ignore - (Thread.create + (Timers.Timer.thread_create (fun _ -> finally (fun _ -> f this_connection) @@ -907,7 +907,7 @@ end module GCLog : GCLOG = struct let start () = - Thread.create + Timers.Timer.thread_create (fun () -> debug "RRD - starting GC Logging thread" ; while true do @@ -1023,7 +1023,7 @@ module Discover : DISCOVER = struct |> List.iter register let start ignored_files = - Thread.create + Timers.Timer.thread_create (fun dir -> debug "RRD plugin - starting discovery thread" ; while true do @@ -1107,7 +1107,7 @@ let _ = We must avoid creating the Unix domain socket twice, so we only call Xcp_service.serve_forever if we are actually using the message-switch. *) let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> if !Xcp_client.use_switch then let server = diff --git a/ocaml/xcp-rrdd/bin/rrdp-iostat/rrdp_iostat.ml b/ocaml/xcp-rrdd/bin/rrdp-iostat/rrdp_iostat.ml index 844ad7f8a17..c4240bd8e78 100644 --- a/ocaml/xcp-rrdd/bin/rrdp-iostat/rrdp_iostat.ml +++ b/ocaml/xcp-rrdd/bin/rrdp-iostat/rrdp_iostat.ml @@ -1171,7 +1171,7 @@ let gen_metrics () = let _ = initialise () ; - let _ = Thread.create Blktap3_stats_wrapper.inotify_thread () in + let _ = Timers.Timer.thread_create Blktap3_stats_wrapper.inotify_thread () in (* Approx. one page per VBD, up to the limit. *) let shared_page_count = 2048 in diff --git a/ocaml/xcp-rrdd/lib/plugin/rrdd_plugin.ml b/ocaml/xcp-rrdd/lib/plugin/rrdd_plugin.ml index 6c2c11192fb..cea9ccc120d 100644 --- a/ocaml/xcp-rrdd/lib/plugin/rrdd_plugin.ml +++ b/ocaml/xcp-rrdd/lib/plugin/rrdd_plugin.ml @@ -72,7 +72,7 @@ module Reporter = struct ~dss_f = let reporter = make () in let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> match target with | Local page_count -> diff --git a/ocaml/xenopsd/lib/suspend_image.ml b/ocaml/xenopsd/lib/suspend_image.ml index 029224e7b06..b09f3b8653c 100644 --- a/ocaml/xenopsd/lib/suspend_image.ml +++ b/ocaml/xenopsd/lib/suspend_image.ml @@ -234,7 +234,7 @@ let with_conversion_script task name hvm fd f = let spawn_thread_and_close_fd _name fd' f = let status = ref Running in let thread = - Thread.create + Timers.Timer.thread_create (fun () -> try let result = finally (fun () -> f ()) (fun () -> Unix.close fd') in diff --git a/ocaml/xenopsd/lib/xenops_server.ml b/ocaml/xenopsd/lib/xenops_server.ml index 0533ce24b54..23c0939ea74 100644 --- a/ocaml/xenopsd/lib/xenops_server.ml +++ b/ocaml/xenopsd/lib/xenops_server.ml @@ -1084,7 +1084,7 @@ module Worker = struct } in let thread = - Thread.create + Timers.Timer.thread_create (fun () -> while not @@ -3936,7 +3936,7 @@ let internal_event_thread_body = let set_backend m = backend := m ; (* start the internal event thread *) - internal_event_thread := Some (Thread.create internal_event_thread_body ()) ; + internal_event_thread := Some (Timers.Timer.thread_create internal_event_thread_body ()) ; let module B = (val get_backend () : S) in B.init () diff --git a/ocaml/xenopsd/lib/xenopsd.ml b/ocaml/xenopsd/lib/xenopsd.ml index 98c88561572..ed6843c7cc0 100644 --- a/ocaml/xenopsd/lib/xenopsd.ml +++ b/ocaml/xenopsd/lib/xenopsd.ml @@ -462,10 +462,10 @@ let main backend = Debug.with_thread_associated "main" (fun () -> let (_ : Thread.t) = - Thread.create (fun () -> Xcp_service.serve_forever forwarded_server) () + Timers.Timer.thread_create (fun () -> Xcp_service.serve_forever forwarded_server) () in let (_ : Thread.t) = - Thread.create (fun () -> Xcp_service.serve_forever xml_server) () + Timers.Timer.thread_create (fun () -> Xcp_service.serve_forever xml_server) () in () ) diff --git a/ocaml/xenopsd/xc/cancel_utils_test.ml b/ocaml/xenopsd/xc/cancel_utils_test.ml index 459c0caa085..6f9ec7db3bb 100644 --- a/ocaml/xenopsd/xc/cancel_utils_test.ml +++ b/ocaml/xenopsd/xc/cancel_utils_test.ml @@ -26,7 +26,7 @@ let tasks = Xenops_task.empty () let xenstore_test xs = let task = Xenops_task.add tasks "test" (fun _ -> None) in let (_ : Thread.t) = - Thread.create + Timers.Timer.thread_create (fun () -> Thread.delay 1. ; Xenops_task.with_cancel task (fun () -> ()) (fun () -> ()) diff --git a/ocaml/xenopsd/xc/device.ml b/ocaml/xenopsd/xc/device.ml index d86caae2d81..2a1b3751e4e 100644 --- a/ocaml/xenopsd/xc/device.ml +++ b/ocaml/xenopsd/xc/device.ml @@ -2967,7 +2967,7 @@ module Backend = struct (* Qemu_upstream_compat.Dm.QMP_Event *) module Event = struct - let init () = ignore (Thread.create QMP_Event.qmp_event_thread ()) + let init () = ignore (Timers.Timer.thread_create QMP_Event.qmp_event_thread ()) end module Make_qemu_upstream (DefaultConfig : Qemu_upstream_config) : Intf = @@ -3775,7 +3775,7 @@ module Dm = struct fails/crashes or is killed *) let waitpid_async x ~callback = ignore - (Thread.create + (Timers.Timer.thread_create (fun x -> callback ( try diff --git a/ocaml/xenopsd/xc/domain.ml b/ocaml/xenopsd/xc/domain.ml index 935027aabea..f875edb3b23 100644 --- a/ocaml/xenopsd/xc/domain.ml +++ b/ocaml/xenopsd/xc/domain.ml @@ -1510,7 +1510,7 @@ let restore_common (task : Xenops_task.task_handle) ~xc ~xs debug "Starting reader thread (fd=%d)" (Obj.magic fd) ; let ch = Event.new_channel () in let th = - Thread.create + Timers.Timer.thread_create (fun () -> let dbg = (Xenops_task.to_interface_task task) diff --git a/xapi-timers.opam b/xapi-timers.opam new file mode 100644 index 00000000000..66d36da2a9c --- /dev/null +++ b/xapi-timers.opam @@ -0,0 +1,25 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +maintainer: ["Xapi project maintainers"] +authors: ["xen-api@lists.xen.org"] +license: "LGPL-2.1-only WITH OCaml-LGPL-linking-exception" +homepage: "https://xapi-project.github.io/" +bug-reports: "https://github.com/xapi-project/xen-api/issues" +depends: [ + "dune" {>= "2.0"} +] +build: [ + ["dune" "subst"] {pinned} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/xapi-project/xen-api.git"