Skip to content

Commit

Permalink
Merge pull request #1024 from talex5/abstract-response
Browse files Browse the repository at this point in the history
Make server response type abstract and allow streaming in cohttp-eio
  • Loading branch information
mseri committed May 30, 2024
2 parents db6cae6 + ba3250b commit 1d9e14b
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 64 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Unreleased

- cohttp-eio: Make server response type abstract and allow streaming in cohttp-eio (talex5 #1024)
- cohttp-{lwt,eio}: server: add connection header to response if not present (ushitora-anqou #1025)
- cohttp-curl: Curl no longer prepends the first HTTP request header to the output. (jonahbeckford #1030, #987)
- cohttp-eio: client: use permissive argument type for make_generic
Expand Down
2 changes: 1 addition & 1 deletion cohttp-eio/examples/dune
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(executables
(names server1 client1 docker_client client_timeout client_tls)
(names server1 server2 client1 docker_client client_timeout client_tls)
(libraries
cohttp-eio
eio_main
Expand Down
14 changes: 7 additions & 7 deletions cohttp-eio/examples/server1.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ and () = Logs.Src.set_level Cohttp_eio.src (Some Debug)

let handler _socket request _body =
match Http.Request.resource request with
| "/" -> (Http.Response.make (), Cohttp_eio.Body.of_string text)
| "/" -> Cohttp_eio.Server.respond_string ~status:`OK ~body:text ()
| "/html" ->
( Http.Response.make
~headers:(Http.Header.of_list [ ("content-type", "text/html") ])
(),
(* Use a plain flow to test chunked encoding *)
Eio.Flow.string_source text )
| _ -> (Http.Response.make ~status:`Not_found (), Cohttp_eio.Body.of_string "")
(* Use a plain flow to test chunked encoding *)
let body = Eio.Flow.string_source text in
Cohttp_eio.Server.respond () ~status:`OK
~headers:(Http.Header.of_list [ ("content-type", "text/html") ])
~body
| _ -> Cohttp_eio.Server.respond_string ~status:`Not_found ~body:"" ()

let log_warning ex = Logs.warn (fun f -> f "%a" Eio.Exn.pp ex)

Expand Down
37 changes: 37 additions & 0 deletions cohttp-eio/examples/server2.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
let () = Logs.set_reporter (Logs_fmt.reporter ())
and () = Logs.Src.set_level Cohttp_eio.src (Some Debug)

let ( / ) = Eio.Path.( / )

(* To stream a file, we take the extra [writer] argument explicitly.
This means that we stream the response while the function is still
running and the file is still open. *)
let handler dir _socket request _body writer =
let path =
Http.Request.resource request
|> String.split_on_char '/'
|> List.filter (( <> ) "")
|> String.concat "/"
in
let path = if path = "" then "index.html" else path in
Eio.Path.with_open_in (dir / path) @@ fun flow ->
Cohttp_eio.Server.respond () ~status:`OK
~headers:(Http.Header.of_list [ ("content-type", "text/html") ])
~body:flow writer

let log_warning ex = Logs.warn (fun f -> f "%a" Eio.Exn.pp ex)

let () =
let port = ref 8080 in
Arg.parse
[ ("-p", Arg.Set_int port, " Listening port number(8080 by default)") ]
ignore "An HTTP/1.1 server";
Eio_main.run @@ fun env ->
Eio.Switch.run @@ fun sw ->
(* Restrict to current directory: *)
let htdocs = Eio.Stdenv.cwd env in
let socket =
Eio.Net.listen env#net ~sw ~backlog:128 ~reuse_addr:true
(`Tcp (Eio.Net.Ipaddr.V4.loopback, !port))
and server = Cohttp_eio.Server.make ~callback:(handler htdocs) () in
Cohttp_eio.Server.run socket server ~on_error:log_warning
82 changes: 38 additions & 44 deletions cohttp-eio/src/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,42 @@ module IO = Io.IO

type body = Body.t
type conn = IO.conn * Cohttp.Connection.t [@@warning "-3"]
type writer = Http.Request.t * IO.oc
type response = writer -> unit

type response_action =
[ `Expert of Http.Response.t * (IO.ic -> IO.oc -> unit IO.t)
| `Response of Http.Response.t * body ]

(* type handler =
* sw:Eio.Switch.t ->
* Eio.Net.Sockaddr.stream ->
* Http.Request.t ->
* Eio.Flow.source ->
* Http.Response.t * Eio.Flow.source *)
[ `Expert of Http.Response.t * (IO.ic -> IO.oc -> unit)
| `Response of response ]

type t = {
conn_closed : conn -> unit;
handler : conn -> Http.Request.t -> body -> response_action IO.t;
handler : conn -> Http.Request.t -> body -> IO.ic -> IO.oc -> unit;
}

let make_response_action ?(conn_closed = fun _ -> ()) ~callback () =
{ conn_closed; handler = callback }
{
conn_closed;
handler =
(fun conn request body ic oc ->
match callback conn request body with
| `Expert (response, handler) ->
Io.Response.write_header response oc;
handler ic oc
| `Response fn -> fn (request, oc));
}
let make_expert ?conn_closed ~callback () =
make_response_action ?conn_closed
~callback:(fun conn request body ->
IO.(callback conn request body >>= fun expert -> `Expert expert))
let expert = callback conn request body in
`Expert expert)
()
let make ?conn_closed ~callback () =
make_response_action ?conn_closed
~callback:(fun conn request body ->
IO.(callback conn request body >>= fun response -> `Response response))
()
let respond ?headers ?flush ~status ~body () =
let response = Cohttp.Response.make ?headers ?flush ~status () in
(response, body)
let respond_string ?headers ?flush ~status ~body () =
respond ?headers ?flush ~status ~body:(Body.of_string body) ()
let make ?(conn_closed = fun _ -> ()) ~callback () =
{
conn_closed;
handler = (fun conn request body _ic oc -> callback conn request body (request, oc));
}
let read input =
match Io.Request.read input with
Expand Down Expand Up @@ -92,6 +90,21 @@ let write output (response : Cohttp.Response.t) body =
in
Eio.Buf_write.flush output
let respond ?(headers = Cohttp.Header.init ()) ?flush ~status ~body () (request, oc) =
let keep_alive = Http.Request.is_keep_alive request in
let headers =
match Cohttp.Header.connection headers with
| None ->
Http.Header.add headers "connection"
(if keep_alive then "keep-alive" else "close")
| Some _ -> headers
in
let response = Cohttp.Response.make ~headers ?flush ~status () in
write oc response body
let respond_string ?headers ?flush ~status ~body () =
respond ?headers ?flush ~status ~body:(Body.of_string body) ()
let callback { conn_closed; handler } ((_, peer_address) as conn) input output =
let id = (Cohttp.Connection.create () [@ocaml.warning "-3"]) in
let rec handle () =
Expand All @@ -114,26 +127,7 @@ let callback { conn_closed; handler } ((_, peer_address) as conn) input output =
(Body.of_string e)
| `Ok (request, body) ->
let () =
try
match handler (conn, id) request body with
| `Response (response, body) ->
let keep_alive =
Http.Request.is_keep_alive request
&& Http.Response.is_keep_alive response
in
let response =
let headers =
Http.Header.add_unless_exists
(Http.Response.headers response)
"connection"
(if keep_alive then "keep-alive" else "close")
in
{ response with Http.Response.headers }
in
write output response body
| `Expert (response, handler) ->
let () = Io.Response.write_header response output in
handler input output
try handler (conn, id) request body input output
with Eio.Io (Eio.Net.E (Connection_reset _), _) ->
Logs.info (fun m ->
m "%a: connection reset" Eio.Net.Sockaddr.pp peer_address)
Expand Down
16 changes: 15 additions & 1 deletion cohttp-eio/src/server.mli
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
include Cohttp.Generic.Server.S with module IO = Io.IO and type body = Body.t
type writer

include
Cohttp.Generic.Server.S
with module IO = Io.IO
and type body = Body.t
and type response = writer -> unit

val respond :
?headers:Http.Header.t ->
?flush:bool ->
status:Http.Status.t ->
body:_ Eio.Flow.source ->
unit ->
response IO.t

val run :
?max_connections:int ->
Expand Down
10 changes: 4 additions & 6 deletions cohttp-eio/tests/test.ml
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
open Eio.Std

let () =
Logs.set_level ~all:true @@ Some Logs.Debug;
Logs.set_reporter (Logs_fmt.reporter ())

let handler _conn request body =
match Http.Request.resource request with
| "/" -> (Http.Response.make (), Cohttp_eio.Body.of_string "root")
| "/" -> Cohttp_eio.Server.respond_string ~status:`OK ~body:"root" ()
| "/stream" ->
let body = Eio_mock.Flow.make "streaming body" in
let () =
Eio_mock.Flow.on_read body
[ `Return "Hello"; `Yield_then (`Return "World") ]
in
(Http.Response.make (), (body :> Eio.Flow.source_ty r))
| "/post" -> (Http.Response.make (), body)
| _ -> (Http.Response.make ~status:`Not_found (), Cohttp_eio.Body.of_string "")
Cohttp_eio.Server.respond ~status:`OK ~body ()
| "/post" -> Cohttp_eio.Server.respond ~status:`OK ~body ()
| _ -> Cohttp_eio.Server.respond_string ~status:`Not_found ~body:"" ()

let () =
Eio_main.run @@ fun env ->
Expand Down
7 changes: 6 additions & 1 deletion cohttp-lwt/src/s.ml
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ end
(** The [Server] module implements a pipelined HTTP/1.1 server. *)
module type Server = sig
module IO : IO
include Cohttp.Generic.Server.S with type body = Body.t and module IO := IO

include
Cohttp.Generic.Server.S
with type body = Body.t
and module IO := IO
and type response = Http.Response.t * Body.t

val resolve_local_file : docroot:string -> uri:Uri.t -> string
[@@deprecated "Please use Cohttp.Path.resolve_local_file. "]
Expand Down
1 change: 1 addition & 0 deletions cohttp-lwt/src/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Make (IO : S.IO) = struct
module Request = Make.Request (IO)
module Response = Make.Response (IO)

type response = Http.Response.t * Body.t
type body = Body.t

let src = Logs.Src.create "cohttp.lwt.server" ~doc:"Cohttp Lwt server module"
Expand Down
9 changes: 5 additions & 4 deletions cohttp/src/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ module type S = sig

type body
type conn = IO.conn * Connection.t [@@warning "-3"]
type response

type response_action =
[ `Expert of Http.Response.t * (IO.ic -> IO.oc -> unit IO.t)
| `Response of Http.Response.t * body ]
| `Response of response ]
(** A request handler can respond in two ways:
- Using [`Response], with a {!Response.t} and a {!body}.
Expand Down Expand Up @@ -38,7 +39,7 @@ module type S = sig

val make :
?conn_closed:(conn -> unit) ->
callback:(conn -> Http.Request.t -> body -> (Http.Response.t * body) IO.t) ->
callback:(conn -> Http.Request.t -> body -> response IO.t) ->
unit ->
t

Expand All @@ -48,7 +49,7 @@ module type S = sig
status:Http.Status.t ->
body:body ->
unit ->
(Http.Response.t * body) IO.t
response IO.t
(** [respond ?headers ?flush ~status ~body] will respond to an HTTP request
with the given [status] code and response [body]. If [flush] is true, then
every response chunk will be flushed to the network rather than being
Expand All @@ -64,7 +65,7 @@ module type S = sig
status:Http.Status.t ->
body:string ->
unit ->
(Http.Response.t * body) IO.t
response IO.t

val callback : t -> IO.conn -> IO.ic -> IO.oc -> unit IO.t
end

0 comments on commit 1d9e14b

Please sign in to comment.