Skip to content

Commit

Permalink
rate limit WiP
Browse files Browse the repository at this point in the history
  • Loading branch information
edwintorok committed Apr 18, 2024
1 parent 7d135a9 commit ee54d17
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 10 deletions.
4 changes: 4 additions & 0 deletions ocaml/xapi-aux/dune
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
(library
(name xapi_aux)
(foreign_stubs
(language c)
(names rusage_thread_stubs)
)
(libraries
astring
cstruct
Expand Down
1 change: 1 addition & 0 deletions ocaml/xapi-aux/rusage_thread.ml
24 changes: 24 additions & 0 deletions ocaml/xapi-aux/rusage_thread.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
(*
* Copyright (C) 2024 Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

(* similar to [gettimeofday]: doesn't allocate, and can return a float directly without boxing it.
This avoids introducing extra overhead in a function used to measure overhead.
*)

(** [getrusage_thread_utime] returns the [ru_utime] field of [struct rusage] for the calling thread.
This call is Linux specific.
*)
external getrusage_thread_utime : unit -> (float [@unboxed]) =
"caml_getrusage_thread_utime" "caml_getrusage_thread_utime_unboxed" [@@noalloc]
27 changes: 27 additions & 0 deletions ocaml/xapi-aux/rusage_thread_stubs.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* must be first according to getrusage(2) */
#define _GNU_SOURCE
#include <sys/resource.h>

#include <caml/alloc.h>
#include <caml/mlvalues.h>
#include <caml/unixsupport.h>

/* declared as noalloc and returns unboxed float,
must not call any CAMLparam/CAMLreturn here!
*/

double caml_getrusage_thread_utime_unboxed(value unit)
{
struct rusage rusage;
const struct timeval *tv;

if (getrusage(RUSAGE_THREAD, &rusage) < 0)
uerror("getrusage", Nothing);
tv = &rusage.ru_utime;
return (tv->tv_sec + tv->tv_usec / 1e6);
}

CAMLprim value caml_getrusage_thread_utime(value unit)
{
return caml_copy_double(caml_getrusage_thread_utime_unboxed(unit));
}
97 changes: 97 additions & 0 deletions ocaml/xapi-aux/throttle.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
* GNU Lesser General Public License for more details.
*)

module D = Debug.Make (struct let name = __MODULE__ end)

module type SIZE = sig
val n : unit -> int
end
Expand All @@ -37,3 +39,98 @@ module Make (Size : SIZE) = struct

let execute f = Semaphore.execute (get_semaphore ()) f
end

module API_cpu_usage = struct
type !+'a t = 'a

type sample = {timestamp: float; sampled_requests: int; sampled_us: int}

type global = {
name: string
; us: int Atomic.t (** user time in microseconds *)
; requests: int Atomic.t
; last_sample: sample Atomic.t
; delay: float Atomic.t
}

type local = float

let make name =
{
name
; requests= Atomic.make 0
; us= Atomic.make 0
; last_sample=
Atomic.make
{sampled_us= 0; sampled_requests= 0; timestamp= Unix.gettimeofday ()}
; delay= Atomic.make 0.
}

let sample_interval = 5.

let sample ~min_batch_delay ~max_cpu_usage global ~timestamp ~sampled_requests
~sampled_us =
let last_sample =
Atomic.exchange global.last_sample
{timestamp; sampled_requests; sampled_us}
in
let delta_t = timestamp -. last_sample.timestamp
and delta_utime = float (sampled_us - last_sample.sampled_us) *. 1e-6
and delta_requests =
float (sampled_requests - last_sample.sampled_requests)
in
(* TODO: remove *)
D.debug "delta_t: %g, delta_utime: %g, delta_requets: %g " delta_t
delta_utime delta_requests ;
(* we could've raced with another update, or clock was adjusted *)
if delta_t > 0. && delta_utime > 0. && delta_requests > 0. then (
let avg_utime_per_request = delta_utime /. delta_requests in
let avg_elapsed_per_request = avg_utime_per_request +. min_batch_delay in
let max_requests =
Float.max delta_requests (delta_t /. avg_elapsed_per_request)
in
let delay =
(Float.max 0. (1. -. max_cpu_usage) *. delta_t /. max_requests)
-. avg_elapsed_per_request
in
let old_delay = Atomic.exchange global.delay delay in
D.debug "delay: %g" delay ;
if abs_float (old_delay -. delay) >= 0.1 then
D.debug "API call delay changed %.2f -> %.2f" old_delay delay
)

let with_limit ?(max_cpu_usage = 0.25) ?(min_batch_delay = 0.05) global f =
(* we assume that each thread is dedicated to one API call, but the thread may be reused
(e.g. if a thread pool is used to handle API calls), so we only measure CPU usage
inside this function.
*)
let utime0 = Rusage_thread.getrusage_thread_utime () in
let delay = Atomic.get global.delay in
if delay > 0. then
Thread.delay delay ;
let finally () =
let current = Unix.gettimeofday () in
let last_sample = Atomic.get global.last_sample in
let delta_us =
(Rusage_thread.getrusage_thread_utime () -. utime0) *. 1e6
|> Float.ceil
|> int_of_float
in
let sampled_us = Atomic.fetch_and_add global.us delta_us
and sampled_requests = Atomic.fetch_and_add global.requests 1 in
(* TODO: remove *)
D.debug "delta %g" (current -. last_sample.timestamp) ;
if current -. last_sample.timestamp >= sample_interval then
sample global ~min_batch_delay ~max_cpu_usage ~timestamp:current
~sampled_requests ~sampled_us
in
Fun.protect ~finally (fun () -> f min_batch_delay)

let with_recursive f =
let rec self delay =
if delay > 0. then
Thread.delay delay ;
(f [@tailcall]) self delay
in
self
end
38 changes: 38 additions & 0 deletions ocaml/xapi-aux/throttle.mli
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,41 @@ module Make (_ : SIZE) : sig

val execute : (unit -> 'a) -> 'a
end

module API_cpu_usage : sig
(** records API usage statistics *)
type !+'a t

(** global resource usage statistics, thread safe *)
type global

(** local resource limit *)
type local

val make : string -> global t
(** [make name] creates a global throttle for CPU usage of API [name].
It is safe to store this into a global value and share it between threads.
*)

val with_limit : ?max_cpu_usage:float -> ?min_batch_delay:float -> global t -> (local t -> 'a) -> 'a
(** [with_limit ?max_cpu_usage ?min_batch_delay limit f] calls [f], potentially rate limiting it if it uses too much CPU.
CPU usage is determined using {!val:Rusage_thread.getrusage_thread_utime} taking only user time into account.
During (long running) kernel time the OCaml runtime lock is released and other threads can execute.
[min_batch_delay] can be used to specify a minimum delay for each API call.
This is useful for API calls that can batch responses, e.g. [Event.from] and [Event.next], e.g.if multiple fields are updated in a record.
[max_cpu_usage] is a value between [[0, 1]] that specifies the per-thread CPU usage limit in [5s] intervals.
When usage is exceeded we begin inserting exponential backoff.
The [t] argument of [f] can be used for rate limiting internal recursive/retry loops within [f].
*)

val with_recursive : ((local t -> 'a) -> local t -> 'a) -> local t -> 'a
(** [with_recursive f limit] calls [f self] with a [self] argument that can be used to call [f] recursively.
A delay is inserted between recursive calls to limit CPU usage and improve batching, and a delay is always inserted upon return.
*)
end
26 changes: 16 additions & 10 deletions ocaml/xapi/xapi_event.ml
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,12 @@ let unregister ~__context ~classes =
sub.subs <- List.filter (fun x -> not (List.mem x subs)) sub.subs
)

let event_next_limit = Throttle.API_cpu_usage.make "Event.next"
let event_from_limit = Throttle.API_cpu_usage.make "Event.from"

(** Blocking call which returns the next set of events relevant to this session. *)
let rec next ~__context =
Throttle.API_cpu_usage.with_limit ~max_cpu_usage:0.01 event_next_limit @@ fun limit ->
let session = Context.get_session_id __context in
let open Next in
assert_subscribed session ;
Expand All @@ -484,15 +488,16 @@ let rec next ~__context =
)
in
(* Like grab_range () only guarantees to return a non-empty range by blocking if necessary *)
let rec grab_nonempty_range () =
let grab_nonempty_range =
Throttle.API_cpu_usage.with_recursive @@ fun self limit ->
let last_id, end_id = grab_range () in
if last_id = end_id then
let (_ : int64) = wait subscription end_id in
grab_nonempty_range ()
self limit
else
(last_id, end_id)
in
let last_id, end_id = grab_nonempty_range () in
let last_id, end_id = grab_nonempty_range limit in
(* debug "next examining events in range %Ld <= x < %Ld" last_id end_id; *)
(* Are any of the new events interesting? *)
let events = events_read last_id end_id in
Expand All @@ -506,7 +511,7 @@ let rec next ~__context =
else
rpc_of_events relevant

let from_inner __context session subs from from_t deadline =
let from_inner __context session subs from from_t deadline limit =
let open From in
(* The database tables involved in our subscription *)
let tables =
Expand Down Expand Up @@ -601,7 +606,8 @@ let from_inner __context session subs from from_t deadline =
(* Each event.from should have an independent subscription record *)
let msg_gen, messages, tableset, (creates, mods, deletes, last) =
with_call session subs (fun sub ->
let rec grab_nonempty_range () =
let grab_nonempty_range =
Throttle.API_cpu_usage.with_recursive @@ fun self limit ->
let ( (msg_gen, messages, _tableset, (creates, mods, deletes, last))
as result
) =
Expand All @@ -620,12 +626,11 @@ let from_inner __context session subs from from_t deadline =
(* last id the client got is equivalent to the current one *)
last_msg_gen := msg_gen ;
wait2 sub last deadline ;
Thread.delay 0.05 ;
grab_nonempty_range ()
self limit
) else
result
in
grab_nonempty_range ()
grab_nonempty_range limit
)
in
last_generation := last ;
Expand Down Expand Up @@ -700,6 +705,8 @@ let from_inner __context session subs from from_t deadline =
{events; valid_ref_counts; token= Token.to_string (last, msg_gen)}

let from ~__context ~classes ~token ~timeout =
let deadline = Unix.gettimeofday () +. timeout in
Throttle.API_cpu_usage.with_limit event_from_limit @@ fun limit ->
let session = Context.get_session_id __context in
let from, from_t =
try Token.of_string token
Expand All @@ -712,13 +719,12 @@ let from ~__context ~classes ~token ~timeout =
)
in
let subs = List.map Subscription.of_string classes in
let deadline = Unix.gettimeofday () +. timeout in
(* We need to iterate because it's possible for an empty event set
to be generated if we peek in-between a Modify and a Delete; we'll
miss the Delete event and fail to generate the Modify because the
snapshot can't be taken. *)
let rec loop () =
let event_from = from_inner __context session subs from from_t deadline in
let event_from = from_inner __context session subs from from_t deadline limit in
if event_from.events = [] && Unix.gettimeofday () < deadline then (
debug "suppressing empty event.from" ;
loop ()
Expand Down

0 comments on commit ee54d17

Please sign in to comment.