Skip to content

Commit

Permalink
timer
Browse files Browse the repository at this point in the history
  • Loading branch information
edwintorok committed May 13, 2024
1 parent 8d1d412 commit 27eb71b
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 75 deletions.
1 change: 1 addition & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(public_name xapi-stdext-unix)
(libraries
fd-send-recv
fmt
polly
unix
mtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ let test_time_limited_write =
let fd = Xapi_fdcaps.Operations.For_test.unsafe_fd_exn wrapped_fd in
Unix.set_nonblock fd ;
let dt = Mtime_clock.counter () in
let deadline = Unixext.mtime_deadline_of_delta ~seconds:timeout
let timeout = Unixext.Timer.timeout_of_seconds ~seconds:timeout in
let deadline = Unixext.Timer.start ~timeout
in
let finally () = test_elapsed := Mtime_clock.count dt in
Fun.protect ~finally (fun () ->
Expand Down Expand Up @@ -114,7 +115,8 @@ let test_time_limited_read =
let fd = Xapi_fdcaps.Operations.For_test.unsafe_fd_exn wrapped_fd in
Unix.set_nonblock fd ;
let dt = Mtime_clock.counter () in
let deadline = Unixext.mtime_deadline_of_delta ~seconds:timeout in
let timeout = Unixext.Timer.timeout_of_seconds ~seconds:timeout in
let deadline = Unixext.Timer.start ~timeout in
let finally () = test_elapsed := Mtime_clock.count dt in
Fun.protect ~finally (fun () ->
Unixext.time_limited_read fd behaviour.size deadline
Expand Down
143 changes: 85 additions & 58 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -590,20 +590,40 @@ let with_polly_wait kind fd f =
in
f wait fd

let mtime_span_of ~seconds =
match Mtime.Span.of_float_ns (seconds *. 1e9) with
| Some span -> span
| None ->
(* e.g. negative, or >=2^53. *)
invalid_arg (Printf.sprintf "Span out of range: %gs" seconds)

let mtime_deadline_of_delta ~seconds =
let now = Mtime_clock.now () in
let span = mtime_span_of ~seconds in
match Mtime.add_span now span with
| Some deadline -> deadline
| None ->
invalid_arg (Format.asprintf "Mtime overflow in: %a + %a" Mtime.pp now Mtime.Span.pp span)
module Timer = struct
let timeout_of_seconds ~seconds =
match Mtime.Span.of_float_ns (seconds *. 1e9) with
| Some span ->
span
| None ->
(* e.g. negative, or >=2^53. *)
invalid_arg (Printf.sprintf "Span out of range: %gs" seconds)

type t = {elapsed: Mtime_clock.counter; timeout: Mtime.Span.t}

type remaining = Spare of Mtime.Span.t | Excess of Mtime.Span.t

let start ~timeout = {elapsed= Mtime_clock.counter (); timeout}

let elapsed {elapsed; _} = elapsed

let timeout {timeout; _} = timeout

let remaining t =
let elapsed = Mtime_clock.count t.elapsed in
let difference = Mtime.Span.abs_diff t.timeout elapsed in
if Mtime.Span.compare t.timeout elapsed > 0 then
Spare difference
else
Excess difference

let pp =
Fmt.record
[
Fmt.field "elapsed" elapsed (Fmt.using Mtime_clock.count Mtime.Span.pp)
; Fmt.field "timeout" timeout Mtime.Span.pp
]
end

(* Write as many bytes to a file descriptor as possible from data before a given clock time. *)
(* Raises Timeout exception if the number of bytes written is less than the specified length. *)
Expand All @@ -615,21 +635,22 @@ let time_limited_write_internal
let total_bytes_to_write = length in
let bytes_written = ref 0 in
while !bytes_written < total_bytes_to_write do
let now = Mtime_clock.now () in
if Mtime.is_later now ~than:target_response_time then raise Timeout ;
let remaining_time = Mtime.span now target_response_time in
wait remaining_time ;
let bytes_to_write = total_bytes_to_write - !bytes_written in
let bytes =
try write filedesc data !bytes_written bytes_to_write
with
| Unix.Unix_error (Unix.EAGAIN, _, _)
| Unix.Unix_error (Unix.EWOULDBLOCK, _, _)
->
0
in
(* write from buffer=data from offset=bytes_written, length=bytes_to_write *)
bytes_written := bytes + !bytes_written
match Timer.remaining target_response_time with
| Excess _ ->
raise Timeout
| Spare remaining_time ->
wait remaining_time ;
let bytes_to_write = total_bytes_to_write - !bytes_written in
let bytes =
try write filedesc data !bytes_written bytes_to_write
with
| Unix.Unix_error (Unix.EAGAIN, _, _)
| Unix.Unix_error (Unix.EWOULDBLOCK, _, _)
->
0
in
(* write from buffer=data from offset=bytes_written, length=bytes_to_write *)
bytes_written := bytes + !bytes_written
done ;
if !bytes_written = total_bytes_to_write then
()
Expand All @@ -653,43 +674,49 @@ let time_limited_read filedesc length target_response_time =
let bytes_read = ref 0 in
let buf = Bytes.make total_bytes_to_read '\000' in
while !bytes_read < total_bytes_to_read do
let now = Mtime_clock.now () in
if Mtime.is_later now ~than:target_response_time then raise Timeout ;
let remaining_time = Mtime.span now target_response_time in
wait remaining_time ;
let bytes_to_read = total_bytes_to_read - !bytes_read in
let bytes =
try Unix.read filedesc buf !bytes_read bytes_to_read
with
| Unix.Unix_error (Unix.EAGAIN, _, _)
| Unix.Unix_error (Unix.EWOULDBLOCK, _, _)
->
0
in
(* read into buffer=buf from offset=bytes_read, length=bytes_to_read *)
if bytes = 0 then
raise End_of_file (* End of file has been reached *)
else
bytes_read := bytes + !bytes_read
match Timer.remaining target_response_time with
| Excess _ ->
raise Timeout
| Spare remaining_time ->
wait remaining_time ;
wait remaining_time ;
let bytes_to_read = total_bytes_to_read - !bytes_read in
let bytes =
try Unix.read filedesc buf !bytes_read bytes_to_read
with
| Unix.Unix_error (Unix.EAGAIN, _, _)
| Unix.Unix_error (Unix.EWOULDBLOCK, _, _)
->
0
in
(* read into buffer=buf from offset=bytes_read, length=bytes_to_read *)
if bytes = 0 then
raise End_of_file (* End of file has been reached *)
else
bytes_read := bytes + !bytes_read
done ;
if !bytes_read = total_bytes_to_read then
Bytes.unsafe_to_string buf
else (* we ran out of time *)
raise Timeout

let time_limited_single_read filedesc length ~max_wait =
let time_limited_single_read filedesc length target_response_time =
let buf = Bytes.make length '\000' in
with_polly_wait Polly.Events.inp filedesc @@ fun wait filedesc ->
wait max_wait ;
let bytes =
try Unix.read filedesc buf 0 length
with
| Unix.Unix_error (Unix.EAGAIN, _, _)
| Unix.Unix_error (Unix.EWOULDBLOCK, _, _)
->
0
in
Bytes.sub_string buf 0 bytes
match Timer.remaining target_response_time with
| Excess _ ->
raise Timeout
| Spare remaining_time ->
wait remaining_time ;
let bytes =
try Unix.read filedesc buf 0 length
with
| Unix.Unix_error (Unix.EAGAIN, _, _)
| Unix.Unix_error (Unix.EWOULDBLOCK, _, _)
->
0
in
Bytes.sub_string buf 0 bytes

(* --------------------------------------------------------------------------------------- *)

Expand Down
35 changes: 20 additions & 15 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli
Original file line number Diff line number Diff line change
Expand Up @@ -154,31 +154,36 @@ val try_read_string : ?limit:int -> Unix.file_descr -> string

exception Timeout

(** [mtime_span_of ~seconds] constructs an {!type:Mtime.Span.t} from [s], which is expressed in seconds.
module Timer: sig
val timeout_of_seconds: seconds:float -> Mtime.Span.t
(** [timeout_of_seconds ~seconds] converts [s] to a Span.
@raises Invalid_argument if [s] is out of range.
*)

The resulting span can be used for functions like {!val:time_limited_single_read}.
type t

@raises Invalid_argument if [s] is out of range. See {!val:Mtime.Span.of_float_ns}
*)
val mtime_span_of: seconds:float -> Mtime.Span.t
type remaining = Spare of Mtime.Span.t | Excess of Mtime.Span.t

(** [mtime_deadline_of_delta ~seconds] calculates the timestamp that is [seconds] later than now.
val start: timeout:Mtime.Span.t -> t
(** [start ~timeout] *)

The resulting value can be used for functions like {!val:time_limited_write}, {!val:time_limited_write_substring}.
@raises Invalid_argument if [s] is out of range. See {!val:Mtime.Span.of_float_ns}
*)
val mtime_deadline_of_delta : seconds:float -> Mtime.t
val remaining: t -> remaining
(** [remaining deadline] calculates the time remaining until the given [deadline]. *)

val pp: Format.formatter -> t -> unit
(** [pp formatter t] pretty prints [t] on [formatter]. *)
end

val time_limited_write : Unix.file_descr -> int -> bytes -> Mtime.t -> unit
val time_limited_write : Unix.file_descr -> int -> bytes -> Timer.t -> unit

val time_limited_write_substring :
Unix.file_descr -> int -> string -> Mtime.t -> unit
Unix.file_descr -> int -> string -> Timer.t -> unit

val time_limited_read : Unix.file_descr -> int -> Mtime.t -> string
val time_limited_read : Unix.file_descr -> int -> Timer.t -> string

val time_limited_single_read :
Unix.file_descr -> int -> max_wait:Mtime.span -> string
Unix.file_descr -> int -> Timer.t -> string

val read_data_in_string_chunks :
(string -> int -> unit)
Expand Down

0 comments on commit 27eb71b

Please sign in to comment.