Skip to content

Commit

Permalink
CP-47536: use Unixext.time_limited_read/write in Jsonrpcclient
Browse files Browse the repository at this point in the history
TODO: double check that the semantics is the same, especially wrt to handling of Unix exceptions,
and the kind of exceptions that are raised.

Signed-off-by: Edwin Török <[email protected]>
  • Loading branch information
edwintorok committed May 13, 2024
1 parent b6b707c commit 76b477e
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 100 deletions.
15 changes: 11 additions & 4 deletions ocaml/networkd/bin/networkd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,28 @@ let options =
; ( "json-rpc-read-timeout"
, Arg.Int
(fun x ->
Jsonrpc_client.json_rpc_read_timeout := Int64.(mul 1000000L (of_int x))
Jsonrpc_client.json_rpc_read_timeout :=
Mtime.Span.(x * ms) |> Xapi_stdext_unix.Unixext.Timeout.of_span
)
, (fun () ->
Int64.(to_string (div !Jsonrpc_client.json_rpc_read_timeout 1000000L))
Mtime.Span.to_float_ns
(!Jsonrpc_client.json_rpc_read_timeout :> Mtime.Span.t)
*. 1e-6
|> Float.to_string
)
, "JSON RPC response read timeout value in ms"
)
; ( "json-rpc-write-timeout"
, Arg.Int
(fun x ->
Jsonrpc_client.json_rpc_write_timeout :=
Int64.(mul 1000000L (of_int x))
Mtime.Span.(x * ms) |> Xapi_stdext_unix.Unixext.Timeout.of_span
)
, (fun () ->
Int64.(to_string (div !Jsonrpc_client.json_rpc_write_timeout 1000000L))
Mtime.Span.to_float_ns
(!Jsonrpc_client.json_rpc_read_timeout :> Mtime.Span.t)
*. 1e-6
|> Float.to_string
)
, "JSON RPC write timeout value in ms"
)
Expand Down
4 changes: 2 additions & 2 deletions ocaml/networkd/lib/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
(libraries
astring
forkexec
mtime
(re_export mtime)
mtime.clock.os
re
re.perl
Expand All @@ -17,7 +17,7 @@
xapi-stdext-pervasives
xapi-stdext-std
xapi-stdext-threads
xapi-stdext-unix
(re_export xapi-stdext-unix)
xapi-inventory
xapi-idl.network
xapi-log
Expand Down
98 changes: 8 additions & 90 deletions ocaml/networkd/lib/jsonrpc_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,117 +17,35 @@
module D = Debug.Make (struct let name = "jsonrpc_client" end)

open D
open Xapi_stdext_unix

exception Timeout

exception Read_error

let json_rpc_max_len = ref 65536 (* Arbitrary maximum length of RPC response *)

let json_rpc_read_timeout = ref 60000000000L
let json_rpc_read_timeout = ref (Mtime.Span.(60 * s) |> Unixext.Timeout.of_span)

(* timeout value in ns when reading RPC response *)

let json_rpc_write_timeout = ref 60000000000L
let json_rpc_write_timeout = ref (Mtime.Span.(60 * s) |> Unixext.Timeout.of_span)

(* timeout value in ns when writing RPC request *)

let to_s s = Int64.to_float s *. 1e-9

(* Read the entire contents of the fd, of unknown length *)
let timeout_read fd timeout =
let buf = Buffer.create !json_rpc_max_len in
let read_start = Mtime_clock.counter () in
let get_total_used_time () =
Mtime.Span.to_uint64_ns (Mtime_clock.count read_start)
in
let rec inner max_time max_bytes =
let ready_to_read, _, _ =
try Unix.select [fd] [] [] (to_s max_time)
with
(* in case the unix.select call fails in situation like interrupt *)
| Unix.Unix_error (Unix.EINTR, _, _) ->
([], [], [])
in
(* This is not accurate the calculate time just for the select part.
However, we think the read time will be minor comparing to the scale of
tens of seconds. the current style will be much concise in code. *)
let remain_time =
let used_time = get_total_used_time () in
Int64.sub timeout used_time
in
if remain_time < 0L then (
debug "Timeout after read %d" (Buffer.length buf) ;
raise Timeout
) ;
if List.mem fd ready_to_read then
let bytes = Bytes.make 4096 '\000' in
match Unix.read fd bytes 0 4096 with
| 0 ->
Buffer.contents buf (* EOF *)
| n ->
if n > max_bytes then (
debug "exceeding maximum read limit %d, clear buffer"
!json_rpc_max_len ;
Buffer.clear buf ;
raise Read_error
) else (
Buffer.add_subbytes buf bytes 0 n ;
inner remain_time (max_bytes - n)
)
| exception
Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _)
->
inner remain_time max_bytes
else
inner remain_time max_bytes
in
inner timeout !json_rpc_max_len
let timeout_read fd max_wait =
Unixext.time_limited_single_read fd !json_rpc_max_len
(Unixext.Timer.start ~timeout:max_wait)

(* Write as many bytes to a file descriptor as possible from data before a given
clock time. *)
(* Raises Timeout exception if the number of bytes written is less than the
specified length. *)
(* Writes into the file descriptor at the current cursor position. *)
let timeout_write filedesc total_length data response_time =
let write_start = Mtime_clock.counter () in
let get_total_used_time () =
Mtime.Span.to_uint64_ns (Mtime_clock.count write_start)
in
let rec inner_write offset max_time =
let _, ready_to_write, _ =
try Unix.select [] [filedesc] [] (to_s max_time)
with
(* in case the unix.select call fails in situation like interrupt *)
| Unix.Unix_error (Unix.EINTR, _, _) ->
([], [], [])
in
let remain_time =
let used_time = get_total_used_time () in
Int64.sub response_time used_time
in
if remain_time < 0L then (
debug "Timeout to write %d at offset %d" total_length offset ;
raise Timeout
) ;
if List.mem filedesc ready_to_write then
let length = total_length - offset in
let bytes_written =
try Unix.single_write filedesc data offset length
with
| Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _)
->
0
in
let new_offset = offset + bytes_written in
if length = bytes_written then
()
else
inner_write new_offset remain_time
else
inner_write offset remain_time
in
inner_write 0 response_time
Xapi_stdext_unix.Unixext.Timer.start ~timeout:response_time
|> Xapi_stdext_unix.Unixext.time_limited_write filedesc total_length data

let with_rpc ?(version = Jsonrpc.V2) ~path ~call () =
let uri = Uri.of_string (Printf.sprintf "file://%s" path) in
Expand Down
8 changes: 5 additions & 3 deletions ocaml/networkd/lib/jsonrpc_client.mli
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ exception Timeout

exception Read_error

open Xapi_stdext_unix

val json_rpc_max_len : int ref

val json_rpc_read_timeout : int64 ref
val json_rpc_read_timeout : Unixext.Timeout.t ref

val json_rpc_write_timeout : int64 ref
val json_rpc_write_timeout : Unixext.Timeout.t ref

val timeout_read : Unix.file_descr -> int64 -> string
val timeout_read : Unix.file_descr -> Unixext.Timeout.t -> string

val with_rpc :
?version:Jsonrpc.version
Expand Down
2 changes: 1 addition & 1 deletion ocaml/networkd/test/test_jsonrpc_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ module Input_json_object = Generic.MakeStateless (struct
let json =
Jsonrpc_client.timeout_read
(Unix.descr_of_in_channel fin)
5_000_000_000L
(Mtime.Span.(5 * s) |> Xapi_stdext_unix.Unixext.Timeout.of_span)
in
let rpc = Jsonrpc.of_string ~strict:false json in
Ok rpc
Expand Down

0 comments on commit 76b477e

Please sign in to comment.