From 23f8fad2856eee906e30e82f31753a89d5ac912a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Mon, 24 Jul 2023 16:52:50 +0100 Subject: [PATCH] wip --- ocaml/loadgen/tracer/dune | 2 +- ocaml/loadgen/tracer/tracer.ml | 4 ++ ocaml/loadgen/zero_http/dune | 2 +- ocaml/loadgen/zero_http/zero_http.ml | 68 +++++++++++++++++++++------- 4 files changed, 58 insertions(+), 18 deletions(-) diff --git a/ocaml/loadgen/tracer/dune b/ocaml/loadgen/tracer/dune index 8170c2a9562..a8ff2484e1a 100644 --- a/ocaml/loadgen/tracer/dune +++ b/ocaml/loadgen/tracer/dune @@ -1,4 +1,4 @@ (executable (name tracer) - (libraries opentelemetry runtime_events zero_http unix) + (libraries opentelemetry runtime_events zero_http unix opentelemetry_client_ocurl) ) diff --git a/ocaml/loadgen/tracer/tracer.ml b/ocaml/loadgen/tracer/tracer.ml index ec5216567f1..a8776a71307 100644 --- a/ocaml/loadgen/tracer/tracer.ml +++ b/ocaml/loadgen/tracer/tracer.ml @@ -1,5 +1,9 @@ open Opentelemetry +let () = + Opentelemetry_client +OTLPSpanExporter(endpoint='http://10.71.57.164:4317', + let ns = Zero_http.Zero_events.Timestamp.to_int64 let delta = ref 0L diff --git a/ocaml/loadgen/zero_http/dune b/ocaml/loadgen/zero_http/dune index bc76f74bfef..82b907f3606 100644 --- a/ocaml/loadgen/zero_http/dune +++ b/ocaml/loadgen/zero_http/dune @@ -1,4 +1,4 @@ (library (name zero_http) - (libraries bigstringaf logs runtime_events unix http ptime.clock.os ptime) + (libraries bigstringaf logs runtime_events unix http ptime.clock.os ptime opentelemetry) ) diff --git a/ocaml/loadgen/zero_http/zero_http.ml b/ocaml/loadgen/zero_http/zero_http.ml index 4282e327ff6..2052dbc0333 100644 --- a/ocaml/loadgen/zero_http/zero_http.ml +++ b/ocaml/loadgen/zero_http/zero_http.ml @@ -21,31 +21,67 @@ module Log = (val Logs.src_log src) module Response = struct type state = WaitStatusLine | WaitContentLength | WaitEndOfHeaders | Discard + let next = Atomic.make 0 + + type span_state = + { id: int + ; mutable status_code: int + ; mutable content_length: int + } + type t = { lines: Zero_lines.t + ; span_state: span_state ; mutable state: state - ; mutable status_code: int - ; mutable content_length: int ; mutable discard: int ; mutable headers_size: int } - let request_begin = Ze.register_marshaled_event "request.begin" ~on_process_event:(fun ~domain ~timestamp_unix_ns ~name ~value -> - (* TODO: just parent info *) + (* can't marshal entire 't' due to bigarray *) + + let span_data = Hashtbl.create 47 + + let request_begin = Ze.register_marshaled_event "request.begin" ~on_process_event:(fun ~domain:_ ~timestamp_unix_ns ~name:_ ~value -> + Hashtbl.replace span_data value.id timestamp_unix_ns ) + let to_attr (key, value) = Opentelemetry.Proto.Common.default_key_value ~key ~value:(Some value) () + + + let attr_int i = Opentelemetry.Proto.Common.Int_value (Int64.of_int i) + let attr_str s = Opentelemetry.Proto.Common.String_value s + let request_end = Ze.register_marshaled_event "request.end" ~on_process_event:(fun ~domain ~timestamp_unix_ns ~name ~value -> - (* TODO: this one actually emits *) + let start_time_unix_nano = Hashtbl.find span_data value.id in + let attributes = + List.rev_map to_attr + [ "http.response.status_code", attr_int value.status_code + ; "http.response.body.size", attr_int value.content_length + ; "http.request.method", attr_str "GET" (* TODO *) + ; "network.protocol.version", attr_str "1.1" + ; "server.address", attr_str "perfuk-18-06d.xenrt.citrite.net" + ; "server.port", attr_int 80 + ; "url.full", attr_str "http://perfuk-18-06d.xenrt.citrite.net/" + + ] + in + let span = Opentelemetry.Proto.Trace.default_span + ~start_time_unix_nano + ~end_time_unix_nano:timestamp_unix_ns + ~kind:Opentelemetry.Span.Span_kind_client + ~attributes + () in + Opentelemetry.Trace.emit ~service_name:"loadgen" [span] ) let create zb reader input = let lines = Zero_lines.make zb ~read:reader input in + let id = Atomic.fetch_and_add next 1 in { lines - ; content_length= 0 + ; span_state = { id; status_code = 0; content_length = 0 } ; state= WaitStatusLine ; discard= 0 - ; status_code= 0 ; headers_size= 0 (* TODO: this is not updated yet *) } @@ -66,12 +102,12 @@ module Response = struct Ze.(write http_response_body End); (* reset *) t.state <- WaitStatusLine ; - let content_length = t.content_length + let content_length = t.span_state.content_length and headers_size = t.headers_size - and status_code = t.status_code in - t.status_code <- 0 ; + and status_code = t.span_state.status_code in + t.span_state.status_code <- 0 ; t.headers_size <- 0 ; - t.content_length <- 0 ; + t.span_state.content_length <- 0 ; if is_debug () then Log.debug (fun m -> @@ -84,7 +120,7 @@ module Response = struct callback ~status_code ~content_length ~headers_size let discard_data t view = - if t.discard = t.content_length then + if t.discard = t.span_state.content_length then Ze.(write http_response_body Begin); let available = Zero_buffer.View.size view in let to_discard = Int.min available t.discard in @@ -107,8 +143,8 @@ module Response = struct if Zero_lines.read_line t.lines is_empty_line false () then ( Ze.(write http_response_headers End); if is_debug () then - Log.debug (fun m -> m "Finished parsing header, about to discard %d bytes of body" t.content_length); - t.discard <- t.content_length ; + Log.debug (fun m -> m "Finished parsing header, about to discard %d bytes of body" t.span_state.content_length); + t.discard <- t.span_state.content_length ; (discard [@tailcall]) t ) else (wait_end_of_headers [@tailcall]) t @@ -153,7 +189,7 @@ module Response = struct assert (content_length >= 0) ; if is_debug () then Log.debug (fun m -> m "Parsed Content-Length: %d" content_length); - t.content_length <- content_length ; + t.span_state.content_length <- content_length ; (wait_end_of_headers [@tailcall]) t let http_200 = "HTTP/1.1 200 " @@ -178,7 +214,7 @@ module Response = struct | -1 -> () | status_code when status_code = 200 || status_code = 403 -> - t.status_code <- status_code ; + t.span_state.status_code <- status_code ; if is_debug () then Log.debug (fun m -> m "Parsed HTTP status code %d" status_code); (wait_content_length [@tailcall]) t