From 76b477e43c2b76ff826c59bd63a6596767f20109 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Fri, 10 May 2024 16:50:37 +0100 Subject: [PATCH] CP-47536: use Unixext.time_limited_read/write in Jsonrpcclient MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TODO: double check that the semantics is the same, especially wrt to handling of Unix exceptions, and the kind of exceptions that are raised. Signed-off-by: Edwin Török --- ocaml/networkd/bin/networkd.ml | 15 +++- ocaml/networkd/lib/dune | 4 +- ocaml/networkd/lib/jsonrpc_client.ml | 98 ++-------------------- ocaml/networkd/lib/jsonrpc_client.mli | 8 +- ocaml/networkd/test/test_jsonrpc_client.ml | 2 +- 5 files changed, 27 insertions(+), 100 deletions(-) diff --git a/ocaml/networkd/bin/networkd.ml b/ocaml/networkd/bin/networkd.ml index e36113580db..8d03a8bfeb6 100644 --- a/ocaml/networkd/bin/networkd.ml +++ b/ocaml/networkd/bin/networkd.ml @@ -122,10 +122,14 @@ let options = ; ( "json-rpc-read-timeout" , Arg.Int (fun x -> - Jsonrpc_client.json_rpc_read_timeout := Int64.(mul 1000000L (of_int x)) + Jsonrpc_client.json_rpc_read_timeout := + Mtime.Span.(x * ms) |> Xapi_stdext_unix.Unixext.Timeout.of_span ) , (fun () -> - Int64.(to_string (div !Jsonrpc_client.json_rpc_read_timeout 1000000L)) + Mtime.Span.to_float_ns + (!Jsonrpc_client.json_rpc_read_timeout :> Mtime.Span.t) + *. 1e-6 + |> Float.to_string ) , "JSON RPC response read timeout value in ms" ) @@ -133,10 +137,13 @@ let options = , Arg.Int (fun x -> Jsonrpc_client.json_rpc_write_timeout := - Int64.(mul 1000000L (of_int x)) + Mtime.Span.(x * ms) |> Xapi_stdext_unix.Unixext.Timeout.of_span ) , (fun () -> - Int64.(to_string (div !Jsonrpc_client.json_rpc_write_timeout 1000000L)) + Mtime.Span.to_float_ns + (!Jsonrpc_client.json_rpc_read_timeout :> Mtime.Span.t) + *. 1e-6 + |> Float.to_string ) , "JSON RPC write timeout value in ms" ) diff --git a/ocaml/networkd/lib/dune b/ocaml/networkd/lib/dune index 6ab41259703..41aee6b98d6 100644 --- a/ocaml/networkd/lib/dune +++ b/ocaml/networkd/lib/dune @@ -3,7 +3,7 @@ (libraries astring forkexec - mtime + (re_export mtime) mtime.clock.os re re.perl @@ -17,7 +17,7 @@ xapi-stdext-pervasives xapi-stdext-std xapi-stdext-threads - xapi-stdext-unix + (re_export xapi-stdext-unix) xapi-inventory xapi-idl.network xapi-log diff --git a/ocaml/networkd/lib/jsonrpc_client.ml b/ocaml/networkd/lib/jsonrpc_client.ml index d43e8774547..5530f4473ef 100644 --- a/ocaml/networkd/lib/jsonrpc_client.ml +++ b/ocaml/networkd/lib/jsonrpc_client.ml @@ -17,6 +17,7 @@ module D = Debug.Make (struct let name = "jsonrpc_client" end) open D +open Xapi_stdext_unix exception Timeout @@ -24,65 +25,18 @@ exception Read_error let json_rpc_max_len = ref 65536 (* Arbitrary maximum length of RPC response *) -let json_rpc_read_timeout = ref 60000000000L +let json_rpc_read_timeout = ref (Mtime.Span.(60 * s) |> Unixext.Timeout.of_span) (* timeout value in ns when reading RPC response *) -let json_rpc_write_timeout = ref 60000000000L +let json_rpc_write_timeout = ref (Mtime.Span.(60 * s) |> Unixext.Timeout.of_span) (* timeout value in ns when writing RPC request *) -let to_s s = Int64.to_float s *. 1e-9 - (* Read the entire contents of the fd, of unknown length *) -let timeout_read fd timeout = - let buf = Buffer.create !json_rpc_max_len in - let read_start = Mtime_clock.counter () in - let get_total_used_time () = - Mtime.Span.to_uint64_ns (Mtime_clock.count read_start) - in - let rec inner max_time max_bytes = - let ready_to_read, _, _ = - try Unix.select [fd] [] [] (to_s max_time) - with - (* in case the unix.select call fails in situation like interrupt *) - | Unix.Unix_error (Unix.EINTR, _, _) -> - ([], [], []) - in - (* This is not accurate the calculate time just for the select part. - However, we think the read time will be minor comparing to the scale of - tens of seconds. the current style will be much concise in code. *) - let remain_time = - let used_time = get_total_used_time () in - Int64.sub timeout used_time - in - if remain_time < 0L then ( - debug "Timeout after read %d" (Buffer.length buf) ; - raise Timeout - ) ; - if List.mem fd ready_to_read then - let bytes = Bytes.make 4096 '\000' in - match Unix.read fd bytes 0 4096 with - | 0 -> - Buffer.contents buf (* EOF *) - | n -> - if n > max_bytes then ( - debug "exceeding maximum read limit %d, clear buffer" - !json_rpc_max_len ; - Buffer.clear buf ; - raise Read_error - ) else ( - Buffer.add_subbytes buf bytes 0 n ; - inner remain_time (max_bytes - n) - ) - | exception - Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) - -> - inner remain_time max_bytes - else - inner remain_time max_bytes - in - inner timeout !json_rpc_max_len +let timeout_read fd max_wait = + Unixext.time_limited_single_read fd !json_rpc_max_len + (Unixext.Timer.start ~timeout:max_wait) (* Write as many bytes to a file descriptor as possible from data before a given clock time. *) @@ -90,44 +44,8 @@ let timeout_read fd timeout = specified length. *) (* Writes into the file descriptor at the current cursor position. *) let timeout_write filedesc total_length data response_time = - let write_start = Mtime_clock.counter () in - let get_total_used_time () = - Mtime.Span.to_uint64_ns (Mtime_clock.count write_start) - in - let rec inner_write offset max_time = - let _, ready_to_write, _ = - try Unix.select [] [filedesc] [] (to_s max_time) - with - (* in case the unix.select call fails in situation like interrupt *) - | Unix.Unix_error (Unix.EINTR, _, _) -> - ([], [], []) - in - let remain_time = - let used_time = get_total_used_time () in - Int64.sub response_time used_time - in - if remain_time < 0L then ( - debug "Timeout to write %d at offset %d" total_length offset ; - raise Timeout - ) ; - if List.mem filedesc ready_to_write then - let length = total_length - offset in - let bytes_written = - try Unix.single_write filedesc data offset length - with - | Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) - -> - 0 - in - let new_offset = offset + bytes_written in - if length = bytes_written then - () - else - inner_write new_offset remain_time - else - inner_write offset remain_time - in - inner_write 0 response_time + Xapi_stdext_unix.Unixext.Timer.start ~timeout:response_time + |> Xapi_stdext_unix.Unixext.time_limited_write filedesc total_length data let with_rpc ?(version = Jsonrpc.V2) ~path ~call () = let uri = Uri.of_string (Printf.sprintf "file://%s" path) in diff --git a/ocaml/networkd/lib/jsonrpc_client.mli b/ocaml/networkd/lib/jsonrpc_client.mli index 81f1e38669f..027e665fbe8 100644 --- a/ocaml/networkd/lib/jsonrpc_client.mli +++ b/ocaml/networkd/lib/jsonrpc_client.mli @@ -16,13 +16,15 @@ exception Timeout exception Read_error +open Xapi_stdext_unix + val json_rpc_max_len : int ref -val json_rpc_read_timeout : int64 ref +val json_rpc_read_timeout : Unixext.Timeout.t ref -val json_rpc_write_timeout : int64 ref +val json_rpc_write_timeout : Unixext.Timeout.t ref -val timeout_read : Unix.file_descr -> int64 -> string +val timeout_read : Unix.file_descr -> Unixext.Timeout.t -> string val with_rpc : ?version:Jsonrpc.version diff --git a/ocaml/networkd/test/test_jsonrpc_client.ml b/ocaml/networkd/test/test_jsonrpc_client.ml index 0ef3acc2c46..2e9f717d7fa 100644 --- a/ocaml/networkd/test/test_jsonrpc_client.ml +++ b/ocaml/networkd/test/test_jsonrpc_client.ml @@ -44,7 +44,7 @@ module Input_json_object = Generic.MakeStateless (struct let json = Jsonrpc_client.timeout_read (Unix.descr_of_in_channel fin) - 5_000_000_000L + (Mtime.Span.(5 * s) |> Xapi_stdext_unix.Unixext.Timeout.of_span) in let rpc = Jsonrpc.of_string ~strict:false json in Ok rpc