Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
edwintorok committed Sep 29, 2023
1 parent acb6942 commit c03ffec
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 30 deletions.
28 changes: 18 additions & 10 deletions ocaml/loadgen/loadgen.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ let write_calls n call uri_path conv =
let n = 1000

let t = Speculative.init ()
let host = "perfuk-18-06d.xenrt.citrite.net"
let host = "perfuk-18-04d.xenrt.citrite.net"
let addr = (Unix.getaddrinfo host "80" [] |> List.hd).Unix.ai_addr
let conn = Speculative.connect t addr

(*module I =
struct
Expand All @@ -60,12 +59,17 @@ module M = struct
let (>>|) _ _ = Blocked
end

let rpc call : Rpc.response M.t =
let trace_id = Opentelemetry.Trace_id.create ()

let rpc conn call : Rpc.response M.t =
(* write_calls n call "jsonrpc" Jsonrpc.string_of_call;
write_calls n call "RPC2" Xmlrpc.string_of_call; *)
let str = Jsonrpc.string_of_call call in
let buf = Buffer.create 1024 in
Printf.bprintf buf "POST /jsonrpc HTTP/1.1\r\nHost: %s\r\nContent-Length: %d\r\n\r\n" host (String.length str);
let parent_id = Opentelemetry.Span_id.create () in
let tp = Opentelemetry.Trace_context.Traceparent.to_value ~trace_id ~parent_id () in
Printf.bprintf buf "POST /jsonrpc HTTP/1.1\r\nHost: %s\r\ntraceparent: %s\r\nContent-Length: %d\r\n\r\n" host tp (String.length str);
Speculative.Connection.request_begin conn tp;
Speculative.Connection.write conn (Buffer.contents buf);
Speculative.Connection.write conn str;
M.Blocked
Expand All @@ -87,11 +91,15 @@ let () =
let version = Xapi_version.version in
Ze.(write Zero_http.url_full @@ "http://" ^ host ^ "/") ;
Ze.(write Zero_http.url_method `POST);
for i = 1 to 100 do
let (_: _ M.t) =
C.Session.login_with_password ~rpc ~uname:"root" ~pwd ~version
~originator:__FILE__
in
()
for conni = 1 to 16 do
let conn = Speculative.connect t addr in
let rpc = rpc conn in
for i = 1 to 10 do
let (_: _ M.t) =
C.Session.login_with_password ~rpc ~uname:"root" ~pwd ~version
~originator:__FILE__
in
()
done;
done;
Speculative.run t
27 changes: 20 additions & 7 deletions ocaml/loadgen/speculative.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ let rec connect_start sock addr =

let responses = ref 0

type send = Data of string | Traceparent of string

module Connection = struct
type t = {
addr: Unix.sockaddr
; socket: Unix.file_descr
; send_buffer: string Queue.t
; send_buffer: send Queue.t
; zb: Zero_http.Zero_buffer.t
; parser: Zero_http.Response.t
; id: int
Expand Down Expand Up @@ -94,22 +96,33 @@ module Connection = struct

let disconnect t = t.closed <- true

let request_begin conn tp =
assert (not conn.closed);
Queue.push (Traceparent tp) conn.send_buffer

let write conn str =
assert (not conn.closed) ;
Queue.push str conn.send_buffer
Queue.push (Data str) conn.send_buffer

let flush _conn = ()

let sum acc s = acc + String.length s
let sum acc s =
match s with
| Data s ->
acc + String.length s
| Traceparent _ -> acc

let sum_queued_bytes acc t = Queue.fold sum acc t.send_buffer

let serialize into off t =
let dst_off = ref off in
let serialize_entry str =
let len = String.length str in
Bigstringaf.blit_from_string str ~src_off:0 into ~dst_off:!dst_off ~len ;
dst_off := !dst_off + len
let serialize_entry = function
| Data str ->
let len = String.length str in
Bigstringaf.blit_from_string str ~src_off:0 into ~dst_off:!dst_off ~len;
dst_off := !dst_off + len
| Traceparent tp ->
Zero_http.Response.write_request_begin t.parser tp;
in
Queue.iter serialize_entry t.send_buffer ;
t.off <- off ;
Expand Down
35 changes: 23 additions & 12 deletions ocaml/loadgen/zero_http/zero_http.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ let url_method = Ze.register_marshaled_event "method" ~on_process_event:(fun ~do
meth := value
)

let tp = ref None
let traceparent = Ze.register_marshaled_event "traceparent" ~on_process_event:(fun ~domain:_ ~timestamp_unix_ns:_ ~name:_ ~value ->
tp := Some value
)


let src = Logs.Src.create ~doc:"zero_http protocol logging" "loadgen.zero_http"

module Log = (val Logs.src_log src)
Expand All @@ -32,6 +26,7 @@ module Response = struct
{ id: int
; mutable status_code: int
; mutable content_length: int
; mutable tp: string
}

type t = {
Expand All @@ -45,11 +40,24 @@ module Response = struct
(* can't marshal entire 't' due to bigarray *)

let span_data = Hashtbl.create 47
let traceparent = Hashtbl.create 47

let request_begin = Ze.register_marshaled_event "request.begin" ~on_process_event:(fun ~domain:_ ~timestamp_unix_ns ~name:_ ~value ->
Hashtbl.replace span_data value.id timestamp_unix_ns
Hashtbl.replace span_data value.id timestamp_unix_ns;
let q =
match Hashtbl.find_opt traceparent value.id with
| None ->
let q = Queue.create () in
Hashtbl.replace traceparent value.id q;
q
| Some q -> q in
Queue.push value.tp q
)

let write_request_begin t tp =
t.span_state.tp <- tp;
Ze.(write request_begin t.span_state)

let to_attr (key, value) =
Opentelemetry.Proto.Common.default_key_value ~key ~value:(Some value) ()

Expand All @@ -58,17 +66,20 @@ module Response = struct

let request_end = Ze.register_marshaled_event "request.end" ~on_process_event:(fun ~domain:_ ~timestamp_unix_ns ~name:_ ~value ->
let start_time = Hashtbl.find span_data value.id in
let tp = Hashtbl.find traceparent value.id in
let trp = Queue.pop tp in
let attrs =
[ "http.response.status_code", attr_int value.status_code
; "http.response.body.size", attr_int value.content_length
; "http.request.method", attr_str "GET" (* TODO *)
; "network.protocol.version", attr_str "1.1"
; "server.address", attr_str "perfuk-18-06d.xenrt.citrite.net"
(* TODO *)
; "server.address", attr_str "perfuk-18-04d.xenrt.citrite.net"
; "server.port", attr_int 80
; "url.full", attr_str "http://perfuk-18-06d.xenrt.citrite.net/"
; "url.full", attr_str "http://perfuk-18-04d.xenrt.citrite.net/"
]
in
let trace_id, id = Opentelemetry.Trace_context.Traceparent.of_value (Option.get !tp) |> Result.get_ok in
let trace_id, id = Opentelemetry.Trace_context.Traceparent.of_value trp |> Result.get_ok in
(* let trace_id = (Opentelemetry.Scope.get_surrounding () |> Option.get).trace_id in*)
let span, _ = Opentelemetry.Span.create
~trace_id
Expand All @@ -86,7 +97,7 @@ module Response = struct
let id = Atomic.fetch_and_add next 1 in
{
lines
; span_state = { id; status_code = 0; content_length = 0 }
; span_state = { id; status_code = 0; content_length = 0; tp= ""}
; state= WaitStatusLine
; discard= 0
; headers_size= 0 (* TODO: this is not updated yet *)
Expand Down Expand Up @@ -219,7 +230,7 @@ module Response = struct
t.state <- WaitStatusLine ;
if Zero_lines.is_bol t.lines then begin
Ze.(write http_response_headers Begin);
Ze.(write request_begin t.span_state);
(* Ze.(write request_begin t.span_state); *)
end;
(* resume from here *)
match Zero_lines.read_line t.lines parse_status_line (-1) () with
Expand Down
4 changes: 3 additions & 1 deletion ocaml/loadgen/zero_http/zero_http.mli
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ module Response : sig
val create : Zero_buffer.t -> ('a Zero_buffer.refill) -> 'a -> t
(** [create buff callback] creates new HTTP response parser state that invokes [callback] when a response has been parsed. *)

val write_request_begin: t -> string -> unit
(** [write_request_begin t traceparent] records that a new request has been created. *)

val read: t -> (status_code:int -> content_length:int -> headers_size:int -> unit) -> unit
(** [read t callback] reads data using [reader] and parses a potentially partial HTTP response stream.
This is a fastpath if the following conditions are met:
Expand All @@ -51,4 +54,3 @@ end

val url_full: string Zero_events.event
val url_method: Http.Method.t Zero_events.event
val traceparent: string Zero_events.event

0 comments on commit c03ffec

Please sign in to comment.