Skip to content

Commit

Permalink
CP-47536: drop duplicate Posix_channel.proxy
Browse files Browse the repository at this point in the history
It was nearly identical to Unixext.proxy, except it kept the file descriptors open.
Introduce a Unixext.proxy_noclose and use it instead of Posix_channel.proxy.

One less function to convert to epoll.

Signed-off-by: Edwin Török <[email protected]>
  • Loading branch information
edwintorok committed May 10, 2024
1 parent ff32fd9 commit c6702c5
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 100 deletions.
8 changes: 6 additions & 2 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ let with_polly f =
let finally () = Polly.close polly in
Xapi_stdext_pervasives.Pervasiveext.finally (fun () -> f polly) finally

let proxy (a : Unix.file_descr) (b : Unix.file_descr) =
let proxy_noclose (a : Unix.file_descr) (b : Unix.file_descr) =
let size = 64 * 1024 in
(* [a'] is read from [a] and will be written to [b] *)
(* [b'] is read from [b] and will be written to [a] *)
Expand Down Expand Up @@ -477,9 +477,13 @@ let proxy (a : Unix.file_descr) (b : Unix.file_descr) =
with _ -> (
(try Unix.clear_nonblock a with _ -> ()) ;
(try Unix.clear_nonblock b with _ -> ()) ;
)

let proxy a b =
try proxy_noclose a b
with _ ->
(try Unix.close a with _ -> ()) ;
try Unix.close b with _ -> ()
)

let try_read_string ?limit fd =
let buf = Buffer.create 0 in
Expand Down
8 changes: 8 additions & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,16 @@ val string_of_signal : int -> string
(** [string_of_signal x] translates an ocaml signal number into
* a string suitable for logging. *)

(** [proxy a b] copies everything received on either [a] or [b] to [b] and [a] respectively.
Closes both sockets when finished.
Exceptions raised by either side of the connection breaks out of the proxy loop,
but exceptions aren't propagated to the caller.
*)
val proxy : Unix.file_descr -> Unix.file_descr -> unit

(** [proxy_noclose a b] is like {!val:proxy}, but keeps both file descriptors open. *)
val proxy_noclose : Unix.file_descr -> Unix.file_descr -> unit

val really_read : Unix.file_descr -> bytes -> int -> int -> unit

val really_read_string : Unix.file_descr -> int -> string
Expand Down
1 change: 1 addition & 0 deletions ocaml/xapi-idl/lib/dune
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
xapi-open-uri
xapi-stdext-pervasives
xapi-stdext-threads
xapi-stdext-unix
xapi-inventory
xmlm
)
Expand Down
99 changes: 1 addition & 98 deletions ocaml/xapi-idl/lib/posix_channel.ml
Original file line number Diff line number Diff line change
@@ -1,104 +1,7 @@
let my_domid = 0 (* TODO: figure this out *)

exception End_of_file

exception Channel_setup_failed

module CBuf = struct
(** A circular buffer constructed from a string *)
type t = {
mutable buffer: bytes
; mutable len: int (** bytes of valid data in [buffer] *)
; mutable start: int (** index of first valid byte in [buffer] *)
; mutable r_closed: bool (** true if no more data can be read due to EOF *)
; mutable w_closed: bool
(** true if no more data can be written due to EOF *)
}

let empty length =
{
buffer= Bytes.create length
; len= 0
; start= 0
; r_closed= false
; w_closed= false
}

let drop (x : t) n =
if n > x.len then failwith (Printf.sprintf "drop %d > %d" n x.len) ;
x.start <- (x.start + n) mod Bytes.length x.buffer ;
x.len <- x.len - n

let should_read (x : t) =
(not x.r_closed) && x.len < Bytes.length x.buffer - 1

let should_write (x : t) = (not x.w_closed) && x.len > 0

let end_of_reads (x : t) = x.r_closed && x.len = 0

let end_of_writes (x : t) = x.w_closed

let write (x : t) fd =
(* Offset of the character after the substring *)
let next = min (Bytes.length x.buffer) (x.start + x.len) in
let len = next - x.start in
let written =
try Unix.single_write fd x.buffer x.start len
with _e ->
x.w_closed <- true ;
len
in
drop x written

let read (x : t) fd =
(* Offset of the next empty character *)
let next = (x.start + x.len) mod Bytes.length x.buffer in
let len =
min (Bytes.length x.buffer - next) (Bytes.length x.buffer - x.len)
in
let read = Unix.read fd x.buffer next len in
if read = 0 then x.r_closed <- true ;
x.len <- x.len + read
end

let proxy (a : Unix.file_descr) (b : Unix.file_descr) =
let size = 64 * 1024 in
(* [a'] is read from [a] and will be written to [b] *)
(* [b'] is read from [b] and will be written to [a] *)
let a' = CBuf.empty size and b' = CBuf.empty size in
Unix.set_nonblock a ;
Unix.set_nonblock b ;
try
while true do
let r =
(if CBuf.should_read a' then [a] else [])
@ if CBuf.should_read b' then [b] else []
in
let w =
(if CBuf.should_write a' then [b] else [])
@ if CBuf.should_write b' then [a] else []
in
(* If we can't make any progress (because fds have been closed), then stop *)
if r = [] && w = [] then raise End_of_file ;
let r, w, _ = Unix.select r w [] (-1.0) in
(* Do the writing before the reading *)
List.iter
(fun fd -> if a = fd then CBuf.write b' a else CBuf.write a' b)
w ;
List.iter (fun fd -> if a = fd then CBuf.read a' a else CBuf.read b' b) r ;
(* If there's nothing else to read or write then signal the other end *)
List.iter
(fun (buf, fd) ->
if CBuf.end_of_reads buf then Unix.shutdown fd Unix.SHUTDOWN_SEND ;
if CBuf.end_of_writes buf then Unix.shutdown fd Unix.SHUTDOWN_RECEIVE
)
[(a', b); (b', a)]
done
with _ -> (
(try Unix.clear_nonblock a with _ -> ()) ;
try Unix.clear_nonblock b with _ -> ()
)

let finally f g =
try
let result = f () in
Expand Down Expand Up @@ -170,7 +73,7 @@ let send proxy_socket =
let fd, _peer = Unix.accept s_ip in
List.iter close !to_close ;
to_close := fd :: !to_close ;
proxy fd proxy_socket
Xapi_stdext_unix.Unixext.proxy_noclose fd proxy_socket
) else
assert false
(* can never happen *)
Expand Down

0 comments on commit c6702c5

Please sign in to comment.