From e361c91667e1aeaf5bb06e273f3972ff461c8a9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Wed, 19 Jul 2023 19:17:04 +0100 Subject: [PATCH] wip --- ocaml/loadgen/speculative.ml | 15 +++++----- ocaml/loadgen/zero_http/zero_events.ml | 2 +- ocaml/loadgen/zero_http/zero_events.mli | 2 +- ocaml/loadgen/zero_http/zero_http.ml | 37 +++++++++++++++++++------ ocaml/loadgen/zero_http/zero_http.mli | 5 +++- 5 files changed, 43 insertions(+), 18 deletions(-) diff --git a/ocaml/loadgen/speculative.ml b/ocaml/loadgen/speculative.ml index 94d380f0a1e..e1280cef291 100644 --- a/ocaml/loadgen/speculative.ml +++ b/ocaml/loadgen/speculative.ml @@ -1,10 +1,12 @@ -module Ev = Zero_http.Zero_events +module Ze = Zero_http.Zero_events let () = Sys.set_signal Sys.sigpipe Sys.Signal_ignore +let event_connect = Ze.register_simple_span "http.connect" + let rec connect_start sock addr = let open Unix in - Ev.User.write Ev.connecting addr ; + Ze.(write event_connect Begin); try connect sock addr with | Unix_error ((EINPROGRESS | EAGAIN | EWOULDBLOCK), _, _) -> () @@ -73,7 +75,6 @@ module Connection = struct in let parser = Zero_http.Response.create zb refill socket in let id = Atomic.fetch_and_add ids 1 in - Ev.User.write Ev.request_id id ; connect_start socket addr ; { addr @@ -178,10 +179,9 @@ let do_disconnect t mux conn = let fastpath_handle_event mux fd events t = let conn = t.conntable.(Xapi_stdext_unix.Unixext.int_of_file_descr fd) in - Ev.User.write Ev.request_id conn.Connection.id ; if Polly.Events.(test events out) then ( if conn.Connection.connecting then ( - Ev.User.write Ev.connected conn.Connection.id ; + Ze.(write event_connect End); conn.Connection.connecting <- false ) ; if Connection.send conn t.send_receive_buffer = 0 then ( @@ -267,12 +267,13 @@ let run ?(receive_buffer_size = 16384) t = (float requests /. (t1 -. t0)) requests + let () = let t = init () in let host = "perfuk-18-06d.xenrt.citrite.net" in let addr = (Unix.getaddrinfo host "80" [] |> List.hd).Unix.ai_addr in - Ev.(User.write http_request_url @@ "http://" ^ host ^ "/") ; - Ev.(User.write http_request_method `GET) ; + Ze.(write Zero_http.url_full @@ "http://" ^ host ^ "/") ; + Ze.(write Zero_http.url_method `GET); (* TODO: traceparent... *) for _ = 1 to nconn do let conn = connect t addr in diff --git a/ocaml/loadgen/zero_http/zero_events.ml b/ocaml/loadgen/zero_http/zero_events.ml index 7d1a0da81a6..1bf3d3b9cdd 100644 --- a/ocaml/loadgen/zero_http/zero_events.ml +++ b/ocaml/loadgen/zero_http/zero_events.ml @@ -106,7 +106,7 @@ type Runtime_events.User.tag += SimpleSpan let register_simple_span name = Runtime_events.User.register name SimpleSpan Runtime_events.Type.span -let emit ev value = Runtime_events.User.write ev value +let write ev value = Runtime_events.User.write ev value let register_callbacks ~on_simple_span callbacks = let on_simple_span domain timestamp user value = diff --git a/ocaml/loadgen/zero_http/zero_events.mli b/ocaml/loadgen/zero_http/zero_events.mli index 59c0da452f7..60789c9db97 100644 --- a/ocaml/loadgen/zero_http/zero_events.mli +++ b/ocaml/loadgen/zero_http/zero_events.mli @@ -30,7 +30,7 @@ val register_marshaled_event: string -> on_process_event:'a callback -> 'a event val register_simple_span : string -> span event -val emit: 'a event -> 'a -> unit +val write: 'a event -> 'a -> unit val register_callbacks: on_simple_span:span callback -> Runtime_events.Callbacks.t -> Runtime_events.Callbacks.t \ No newline at end of file diff --git a/ocaml/loadgen/zero_http/zero_http.ml b/ocaml/loadgen/zero_http/zero_http.ml index 44ecfff08e7..4282e327ff6 100644 --- a/ocaml/loadgen/zero_http/zero_http.ml +++ b/ocaml/loadgen/zero_http/zero_http.ml @@ -1,7 +1,19 @@ -module Ev = Zero_events +module Ze = Zero_events module Zero_buffer = Zero_buffer module Zero_events = Zero_events +let url = ref "" +let url_full = Ze.register_marshaled_event "url.full" ~on_process_event:(fun ~domain:_ ~timestamp_unix_ns:_ ~name:_ ~value -> + url := value +) + +let meth : Http.Method.t ref = ref `GET + +let url_method = Ze.register_marshaled_event "method" ~on_process_event:(fun ~domain:_ ~timestamp_unix_ns:_ ~name:_ ~value -> + meth := value +) + + let src = Logs.Src.create ~doc:"zero_http protocol logging" "loadgen.zero_http" module Log = (val Logs.src_log src) @@ -18,6 +30,14 @@ module Response = struct ; mutable headers_size: int } + let request_begin = Ze.register_marshaled_event "request.begin" ~on_process_event:(fun ~domain ~timestamp_unix_ns ~name ~value -> + (* TODO: just parent info *) + ) + + let request_end = Ze.register_marshaled_event "request.end" ~on_process_event:(fun ~domain ~timestamp_unix_ns ~name ~value -> + (* TODO: this one actually emits *) + ) + let create zb reader input = let lines = Zero_lines.make zb ~read:reader input in { @@ -40,8 +60,10 @@ module Response = struct Log.debug (fun m -> m "Cannot parse HTTP line, falling back to slow parser"); raise Fallback + let http_response_body = Ze.register_simple_span "http.response.body" + let finish t callback = - Ev.(User.write http_response_body End); + Ze.(write http_response_body End); (* reset *) t.state <- WaitStatusLine ; let content_length = t.content_length @@ -63,7 +85,7 @@ module Response = struct let discard_data t view = if t.discard = t.content_length then - Ev.(User.write http_response_body Begin); + Ze.(write http_response_body Begin); let available = Zero_buffer.View.size view in let to_discard = Int.min available t.discard in t.discard <- t.discard - to_discard ; @@ -77,11 +99,13 @@ module Response = struct let is_empty_line _ () _ ~eol_len = eol_len = 0 + let http_response_headers = Ze.register_simple_span "http.response.headers" + let rec wait_end_of_headers t = t.state <- WaitEndOfHeaders ; (* resume from here *) if Zero_lines.read_line t.lines is_empty_line false () then ( - Ev.(User.write http_response_headers End); + Ze.(write http_response_headers End); if is_debug () then Log.debug (fun m -> m "Finished parsing header, about to discard %d bytes of body" t.content_length); t.discard <- t.content_length ; @@ -127,7 +151,6 @@ module Response = struct wait_content_length t | content_length -> assert (content_length >= 0) ; - Ev.(User.write http_response_body_size content_length); if is_debug () then Log.debug (fun m -> m "Parsed Content-Length: %d" content_length); t.content_length <- content_length ; @@ -149,19 +172,17 @@ module Response = struct let wait_status_line t = t.state <- WaitStatusLine ; if Zero_lines.is_bol t.lines then - Ev.(User.write http_response_headers Begin); + Ze.(write http_response_headers Begin); (* resume from here *) match Zero_lines.read_line t.lines parse_status_line (-1) () with | -1 -> () | status_code when status_code = 200 || status_code = 403 -> - Ev.(User.write http_response_status_code status_code); t.status_code <- status_code ; if is_debug () then Log.debug (fun m -> m "Parsed HTTP status code %d" status_code); (wait_content_length [@tailcall]) t | status_code -> - Ev.(User.write http_response_status_code status_code); if is_debug () then Log.debug (fun m -> m "HTTP status code: %d, falling back to slow parser" status_code); failwith "TODO: fallback" diff --git a/ocaml/loadgen/zero_http/zero_http.mli b/ocaml/loadgen/zero_http/zero_http.mli index 37839b299c1..8b5dc697337 100644 --- a/ocaml/loadgen/zero_http/zero_http.mli +++ b/ocaml/loadgen/zero_http/zero_http.mli @@ -47,4 +47,7 @@ module Response : sig @param reader is a function to read data from some connection ['a] into the supplied buffer and return the amount of data read @param conn is a parameter for [reader] (e.g. a file descriptor) *) -end \ No newline at end of file +end + +val url_full: string Zero_events.event +val url_method: Http.Method.t Zero_events.event \ No newline at end of file