Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
edwintorok committed Jul 19, 2023
1 parent 8b433d1 commit e361c91
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 18 deletions.
15 changes: 8 additions & 7 deletions ocaml/loadgen/speculative.ml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
module Ev = Zero_http.Zero_events
module Ze = Zero_http.Zero_events

let () = Sys.set_signal Sys.sigpipe Sys.Signal_ignore

let event_connect = Ze.register_simple_span "http.connect"

let rec connect_start sock addr =
let open Unix in
Ev.User.write Ev.connecting addr ;
Ze.(write event_connect Begin);
try connect sock addr with
| Unix_error ((EINPROGRESS | EAGAIN | EWOULDBLOCK), _, _) ->
()
Expand Down Expand Up @@ -73,7 +75,6 @@ module Connection = struct
in
let parser = Zero_http.Response.create zb refill socket in
let id = Atomic.fetch_and_add ids 1 in
Ev.User.write Ev.request_id id ;
connect_start socket addr ;
{
addr
Expand Down Expand Up @@ -178,10 +179,9 @@ let do_disconnect t mux conn =

let fastpath_handle_event mux fd events t =
let conn = t.conntable.(Xapi_stdext_unix.Unixext.int_of_file_descr fd) in
Ev.User.write Ev.request_id conn.Connection.id ;
if Polly.Events.(test events out) then (
if conn.Connection.connecting then (
Ev.User.write Ev.connected conn.Connection.id ;
Ze.(write event_connect End);
conn.Connection.connecting <- false
) ;
if Connection.send conn t.send_receive_buffer = 0 then (
Expand Down Expand Up @@ -267,12 +267,13 @@ let run ?(receive_buffer_size = 16384) t =
(float requests /. (t1 -. t0))
requests


let () =
let t = init () in
let host = "perfuk-18-06d.xenrt.citrite.net" in
let addr = (Unix.getaddrinfo host "80" [] |> List.hd).Unix.ai_addr in
Ev.(User.write http_request_url @@ "http://" ^ host ^ "/") ;
Ev.(User.write http_request_method `GET) ;
Ze.(write Zero_http.url_full @@ "http://" ^ host ^ "/") ;
Ze.(write Zero_http.url_method `GET);
(* TODO: traceparent... *)
for _ = 1 to nconn do
let conn = connect t addr in
Expand Down
2 changes: 1 addition & 1 deletion ocaml/loadgen/zero_http/zero_events.ml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type Runtime_events.User.tag += SimpleSpan
let register_simple_span name =
Runtime_events.User.register name SimpleSpan Runtime_events.Type.span

let emit ev value = Runtime_events.User.write ev value
let write ev value = Runtime_events.User.write ev value

let register_callbacks ~on_simple_span callbacks =
let on_simple_span domain timestamp user value =
Expand Down
2 changes: 1 addition & 1 deletion ocaml/loadgen/zero_http/zero_events.mli
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ val register_marshaled_event: string -> on_process_event:'a callback -> 'a event

val register_simple_span : string -> span event

val emit: 'a event -> 'a -> unit
val write: 'a event -> 'a -> unit

val register_callbacks:
on_simple_span:span callback -> Runtime_events.Callbacks.t -> Runtime_events.Callbacks.t
37 changes: 29 additions & 8 deletions ocaml/loadgen/zero_http/zero_http.ml
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
module Ev = Zero_events
module Ze = Zero_events
module Zero_buffer = Zero_buffer
module Zero_events = Zero_events

let url = ref ""
let url_full = Ze.register_marshaled_event "url.full" ~on_process_event:(fun ~domain:_ ~timestamp_unix_ns:_ ~name:_ ~value ->
url := value
)

let meth : Http.Method.t ref = ref `GET

let url_method = Ze.register_marshaled_event "method" ~on_process_event:(fun ~domain:_ ~timestamp_unix_ns:_ ~name:_ ~value ->
meth := value
)


let src = Logs.Src.create ~doc:"zero_http protocol logging" "loadgen.zero_http"

module Log = (val Logs.src_log src)
Expand All @@ -18,6 +30,14 @@ module Response = struct
; mutable headers_size: int
}

let request_begin = Ze.register_marshaled_event "request.begin" ~on_process_event:(fun ~domain ~timestamp_unix_ns ~name ~value ->
(* TODO: just parent info *)
)

let request_end = Ze.register_marshaled_event "request.end" ~on_process_event:(fun ~domain ~timestamp_unix_ns ~name ~value ->
(* TODO: this one actually emits *)
)

let create zb reader input =
let lines = Zero_lines.make zb ~read:reader input in
{
Expand All @@ -40,8 +60,10 @@ module Response = struct
Log.debug (fun m -> m "Cannot parse HTTP line, falling back to slow parser");
raise Fallback

let http_response_body = Ze.register_simple_span "http.response.body"

let finish t callback =
Ev.(User.write http_response_body End);
Ze.(write http_response_body End);
(* reset *)
t.state <- WaitStatusLine ;
let content_length = t.content_length
Expand All @@ -63,7 +85,7 @@ module Response = struct

let discard_data t view =
if t.discard = t.content_length then
Ev.(User.write http_response_body Begin);
Ze.(write http_response_body Begin);
let available = Zero_buffer.View.size view in
let to_discard = Int.min available t.discard in
t.discard <- t.discard - to_discard ;
Expand All @@ -77,11 +99,13 @@ module Response = struct

let is_empty_line _ () _ ~eol_len = eol_len = 0

let http_response_headers = Ze.register_simple_span "http.response.headers"

let rec wait_end_of_headers t =
t.state <- WaitEndOfHeaders ;
(* resume from here *)
if Zero_lines.read_line t.lines is_empty_line false () then (
Ev.(User.write http_response_headers End);
Ze.(write http_response_headers End);
if is_debug () then
Log.debug (fun m -> m "Finished parsing header, about to discard %d bytes of body" t.content_length);
t.discard <- t.content_length ;
Expand Down Expand Up @@ -127,7 +151,6 @@ module Response = struct
wait_content_length t
| content_length ->
assert (content_length >= 0) ;
Ev.(User.write http_response_body_size content_length);
if is_debug () then
Log.debug (fun m -> m "Parsed Content-Length: %d" content_length);
t.content_length <- content_length ;
Expand All @@ -149,19 +172,17 @@ module Response = struct
let wait_status_line t =
t.state <- WaitStatusLine ;
if Zero_lines.is_bol t.lines then
Ev.(User.write http_response_headers Begin);
Ze.(write http_response_headers Begin);
(* resume from here *)
match Zero_lines.read_line t.lines parse_status_line (-1) () with
| -1 ->
()
| status_code when status_code = 200 || status_code = 403 ->
Ev.(User.write http_response_status_code status_code);
t.status_code <- status_code ;
if is_debug () then
Log.debug (fun m -> m "Parsed HTTP status code %d" status_code);
(wait_content_length [@tailcall]) t
| status_code ->
Ev.(User.write http_response_status_code status_code);
if is_debug () then
Log.debug (fun m -> m "HTTP status code: %d, falling back to slow parser" status_code);
failwith "TODO: fallback"
Expand Down
5 changes: 4 additions & 1 deletion ocaml/loadgen/zero_http/zero_http.mli
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,7 @@ module Response : sig
@param reader is a function to read data from some connection ['a] into the supplied buffer and return the amount of data read
@param conn is a parameter for [reader] (e.g. a file descriptor)
*)
end
end

val url_full: string Zero_events.event
val url_method: Http.Method.t Zero_events.event

0 comments on commit e361c91

Please sign in to comment.