Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

conduit-lwt-unix: se accept_n on the server #387

Closed
wants to merge 11 commits into from
49 changes: 28 additions & 21 deletions src/conduit-lwt-unix/conduit_lwt_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -71,34 +71,41 @@ let run_handler handler v =
f "Uncaught exception in handler: %s" (Printexc.to_string ex)));
Lwt.return_unit))

let init ?(stop = fst (Lwt.wait ())) handler fd =
let init ?(nconn = 10_000) ?(stop = fst (Lwt.wait ())) handler fd =
let stop = Lwt.map (fun () -> `Stop) stop in
let log_exn = function
| Some ex ->
Log.warn (fun f ->
f "Uncaught exception accepting connection: %s"
(Printexc.to_string ex))
| None -> ()
in
let rec loop () =
Lwt.try_bind
let accepted =
Lwt_unix.accept_n fd nconn >>= fun (connections, maybe_error) ->
log_exn maybe_error;
Lwt.return (`Accepted connections)
in
Lwt.catch
(fun () ->
connected ();
throttle () >>= fun () ->
let accept = Lwt.map (fun v -> `Accept v) (Lwt_unix.accept fd) in
Lwt.choose [ accept; stop ] >|= function
| `Stop ->
Lwt.cancel accept;
`Stop
| `Accept _ as x -> x)
(function
Lwt.choose [ accepted; stop ] >>= function
| `Stop ->
disconnected ();
Lwt.cancel accepted;
Lwt.return_unit
| `Accept v ->
run_handler handler v;
loop ())
(fun exn ->
disconnected ();
match exn with
| `Accepted connections ->
Lwt_list.iter_p
(fun v ->
connected ();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may also be a race to update connected here since I am calling iter_p, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite see the race you're referring to here -- where is the iter_p?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am not mistaken iter_p runs the iteration in parallel, and connected () increments the active connection count without any protection (let connected () = incr active). My fear was that multiple connections executed in parallel could try to increment the variable simultaneously (and I don't want to put a mutex there).

throttle () >>= fun () ->
run_handler handler v;
Lwt.return_unit)
connections
>>= Lwt_unix.yield
>>= loop)
(function
| Lwt.Canceled -> Lwt.return_unit
| ex ->
Log.warn (fun f ->
f "Uncaught exception accepting connection: %s"
(Printexc.to_string ex));
log_exn (Some ex);
Lwt_unix.yield () >>= loop)
in
Lwt.finalize loop (fun () -> Lwt_unix.close fd)
1 change: 1 addition & 0 deletions src/conduit-lwt-unix/conduit_lwt_server.mli
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ val process_accept :
unit Lwt.t

val init :
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should probably document this, I was first waiting to see if you are interested at all and what the review says about the implementation

?nconn:int ->
?stop:unit Lwt.t ->
(Lwt_unix.file_descr * Lwt_unix.sockaddr -> unit Lwt.t) ->
Lwt_unix.file_descr ->
Expand Down