Skip to content

Commit

Permalink
fix: new lwt server bugs
Browse files Browse the repository at this point in the history
* cannot call direct access recursively
* we need to catch conn_reset

Signed-off-by: Rudi Grinberg <[email protected]>

<!-- ps-id: c1719f95-0851-4245-8948-61a975d97f1d -->
  • Loading branch information
rgrinberg committed Jun 26, 2024
1 parent 7ecb79a commit d6a5d61
Showing 1 changed file with 36 additions and 24 deletions.
60 changes: 36 additions & 24 deletions cohttp-server-lwt-unix/src/cohttp_server_lwt_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -120,30 +120,44 @@ module Input_channel = struct
let ( >>| ) = ( >|= )
end)
(struct
type src = Lwt_io.direct_access

let rec refill (da : Lwt_io.direct_access) buf ~pos ~len =
Lwt.catch
(fun () ->
let available = da.da_max - da.da_ptr in
if available = 0 then
let* read = da.da_perform () in
if read = 0 then Lwt.return `Eof else refill da buf ~pos ~len
else
let read_len = min available len in
Lwt_bytes.blit_to_bytes da.da_buffer da.da_ptr buf pos read_len;
da.da_ptr <- da.da_ptr + read_len;
Lwt.return (`Ok read_len))
(function
| Lwt_io.Channel_closed _ -> Lwt.return `Eof | exn -> raise exn)
type src = Lwt_io.input_channel

let rec refill ic buf ~pos ~len =
let open Lwt.Infix in
if Lwt_io.is_closed ic then Lwt.return `Eof
else
Lwt.catch
(fun () ->
Lwt_io.direct_access ic (fun da ->
let available = da.da_max - da.da_ptr in
if available = 0 then
let+ read = da.da_perform () in
if read = 0 then `Eof else `Refill
else
let read_len = min available len in
Lwt_bytes.blit_to_bytes da.da_buffer da.da_ptr buf pos
read_len;
da.da_ptr <- da.da_ptr + read_len;
Lwt.return (`Ok read_len)))
(function
| Unix.Unix_error (ECONNRESET, _, _) | Lwt_io.Channel_closed _
->
Lwt.return `Eof
| exn -> raise exn)
>>= function
| `Eof ->
let* () = Lwt_io.close ic in
Lwt.return `Eof
| `Ok n -> Lwt.return (`Ok n)
| `Refill -> refill ic buf ~pos ~len
end)

type t = { buf : Bytebuffer.t; da : Lwt_io.direct_access }
type t = { buf : Bytebuffer.t; ic : Lwt_io.input_channel }

let create ?(buf_len = 0x4000) da = { buf = Bytebuffer.create buf_len; da }
let read_line_opt t = Refill.read_line t.buf t.da
let read t count = Refill.read t.buf t.da count
let refill t = Refill.refill t.buf t.da
let create ?(buf_len = 0x4000) ic = { buf = Bytebuffer.create buf_len; ic }
let read_line_opt t = Refill.read_line t.buf t.ic
let read t count = Refill.read t.buf t.ic count
let refill t = Refill.refill t.buf t.ic
let remaining t = Bytebuffer.length t.buf

let with_input_buffer (t : t) ~f =
Expand Down Expand Up @@ -365,6 +379,4 @@ let handle_connection { callback; on_exn } (ic, oc) =
in
if keep_alive then loop callback ic oc else Lwt.return_unit
in
Lwt_io.direct_access ic (fun da ->
let ic = Input_channel.create da in
loop callback ic oc)
loop callback (Input_channel.create ic) oc

0 comments on commit d6a5d61

Please sign in to comment.