Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
edwintorok committed Jul 24, 2023
1 parent e361c91 commit 23f8fad
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 18 deletions.
2 changes: 1 addition & 1 deletion ocaml/loadgen/tracer/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(executable
(name tracer)
(libraries opentelemetry runtime_events zero_http unix)
(libraries opentelemetry runtime_events zero_http unix opentelemetry_client_ocurl)
)
4 changes: 4 additions & 0 deletions ocaml/loadgen/tracer/tracer.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
open Opentelemetry

let () =
Opentelemetry_client
OTLPSpanExporter(endpoint='http://10.71.57.164:4317',

let ns = Zero_http.Zero_events.Timestamp.to_int64

let delta = ref 0L
Expand Down
2 changes: 1 addition & 1 deletion ocaml/loadgen/zero_http/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library
(name zero_http)
(libraries bigstringaf logs runtime_events unix http ptime.clock.os ptime)
(libraries bigstringaf logs runtime_events unix http ptime.clock.os ptime opentelemetry)
)
68 changes: 52 additions & 16 deletions ocaml/loadgen/zero_http/zero_http.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,67 @@ module Log = (val Logs.src_log src)
module Response = struct
type state = WaitStatusLine | WaitContentLength | WaitEndOfHeaders | Discard

let next = Atomic.make 0

type span_state =
{ id: int
; mutable status_code: int
; mutable content_length: int
}

type t = {
lines: Zero_lines.t
; span_state: span_state
; mutable state: state
; mutable status_code: int
; mutable content_length: int
; mutable discard: int
; mutable headers_size: int
}

let request_begin = Ze.register_marshaled_event "request.begin" ~on_process_event:(fun ~domain ~timestamp_unix_ns ~name ~value ->
(* TODO: just parent info *)
(* can't marshal entire 't' due to bigarray *)

let span_data = Hashtbl.create 47

let request_begin = Ze.register_marshaled_event "request.begin" ~on_process_event:(fun ~domain:_ ~timestamp_unix_ns ~name:_ ~value ->
Hashtbl.replace span_data value.id timestamp_unix_ns
)

let to_attr (key, value) = Opentelemetry.Proto.Common.default_key_value ~key ~value:(Some value) ()


let attr_int i = Opentelemetry.Proto.Common.Int_value (Int64.of_int i)
let attr_str s = Opentelemetry.Proto.Common.String_value s

let request_end = Ze.register_marshaled_event "request.end" ~on_process_event:(fun ~domain ~timestamp_unix_ns ~name ~value ->
(* TODO: this one actually emits *)
let start_time_unix_nano = Hashtbl.find span_data value.id in
let attributes =
List.rev_map to_attr
[ "http.response.status_code", attr_int value.status_code
; "http.response.body.size", attr_int value.content_length
; "http.request.method", attr_str "GET" (* TODO *)
; "network.protocol.version", attr_str "1.1"
; "server.address", attr_str "perfuk-18-06d.xenrt.citrite.net"
; "server.port", attr_int 80
; "url.full", attr_str "http://perfuk-18-06d.xenrt.citrite.net/"

]
in
let span = Opentelemetry.Proto.Trace.default_span
~start_time_unix_nano
~end_time_unix_nano:timestamp_unix_ns
~kind:Opentelemetry.Span.Span_kind_client
~attributes
() in
Opentelemetry.Trace.emit ~service_name:"loadgen" [span]
)

let create zb reader input =
let lines = Zero_lines.make zb ~read:reader input in
let id = Atomic.fetch_and_add next 1 in
{
lines
; content_length= 0
; span_state = { id; status_code = 0; content_length = 0 }
; state= WaitStatusLine
; discard= 0
; status_code= 0
; headers_size= 0 (* TODO: this is not updated yet *)
}

Expand All @@ -66,12 +102,12 @@ module Response = struct
Ze.(write http_response_body End);
(* reset *)
t.state <- WaitStatusLine ;
let content_length = t.content_length
let content_length = t.span_state.content_length
and headers_size = t.headers_size
and status_code = t.status_code in
t.status_code <- 0 ;
and status_code = t.span_state.status_code in
t.span_state.status_code <- 0 ;
t.headers_size <- 0 ;
t.content_length <- 0 ;
t.span_state.content_length <- 0 ;

if is_debug () then
Log.debug (fun m ->
Expand All @@ -84,7 +120,7 @@ module Response = struct
callback ~status_code ~content_length ~headers_size

let discard_data t view =
if t.discard = t.content_length then
if t.discard = t.span_state.content_length then
Ze.(write http_response_body Begin);
let available = Zero_buffer.View.size view in
let to_discard = Int.min available t.discard in
Expand All @@ -107,8 +143,8 @@ module Response = struct
if Zero_lines.read_line t.lines is_empty_line false () then (
Ze.(write http_response_headers End);
if is_debug () then
Log.debug (fun m -> m "Finished parsing header, about to discard %d bytes of body" t.content_length);
t.discard <- t.content_length ;
Log.debug (fun m -> m "Finished parsing header, about to discard %d bytes of body" t.span_state.content_length);
t.discard <- t.span_state.content_length ;
(discard [@tailcall]) t
) else
(wait_end_of_headers [@tailcall]) t
Expand Down Expand Up @@ -153,7 +189,7 @@ module Response = struct
assert (content_length >= 0) ;
if is_debug () then
Log.debug (fun m -> m "Parsed Content-Length: %d" content_length);
t.content_length <- content_length ;
t.span_state.content_length <- content_length ;
(wait_end_of_headers [@tailcall]) t

let http_200 = "HTTP/1.1 200 "
Expand All @@ -178,7 +214,7 @@ module Response = struct
| -1 ->
()
| status_code when status_code = 200 || status_code = 403 ->
t.status_code <- status_code ;
t.span_state.status_code <- status_code ;
if is_debug () then
Log.debug (fun m -> m "Parsed HTTP status code %d" status_code);
(wait_content_length [@tailcall]) t
Expand Down

0 comments on commit 23f8fad

Please sign in to comment.