From 6e47e4a5060f1b32f8c5697b3444d647671013cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Tue, 11 Jul 2023 17:37:29 +0100 Subject: [PATCH] wip --- ocaml/loadgen/speculative.ml | 3 +- ocaml/loadgen/time.lua | 130 +++++++++++++++++++++++------------ 2 files changed, 89 insertions(+), 44 deletions(-) diff --git a/ocaml/loadgen/speculative.ml b/ocaml/loadgen/speculative.ml index 94fda6b121f..5e61f392352 100644 --- a/ocaml/loadgen/speculative.ml +++ b/ocaml/loadgen/speculative.ml @@ -207,7 +207,8 @@ let build_conntable connections = connections ; conntbl -let repeat = 100 +let repeat = 1 +(* TODO: have pipeline and nonpipeline mode so we can have multiple persistent but not pipelined requests *) let nconn = 100 diff --git a/ocaml/loadgen/time.lua b/ocaml/loadgen/time.lua index f240f1e7fac..51f1b55f207 100644 --- a/ocaml/loadgen/time.lua +++ b/ocaml/loadgen/time.lua @@ -69,7 +69,7 @@ end ---@param str string ---@return string local csv_escape = function(str) - return '"' .. string.gsub(str, '"', '""') .. '""' + return '"' .. string.gsub(str, '"', '""') .. '"' end ---prints a CSV line with fields ordered according to [csv_header] @@ -85,7 +85,7 @@ local print_csv_line = function(csv_header, data) line[i] = '' end end - io.write(table.concat(line), '\n') + io.write(table.concat(line, ','), '\n') end ---@param trace_id string @@ -123,33 +123,54 @@ done = function(_, _, _) if id then trace_id = id end - - io.flush() - print("getting spans") - io.flush() - local spans = thread:get("Spans") - print("got spans") - io.flush() - if spans then - ---@cast spans table - first_span = spans + end + local span_threads = {} + for j, thread in ipairs(threads) do + local n = thread:get("Response_counter") + local spans = {} + local default_span = thread:get("Default_span") + for i = 1, n do + local span = {} + for k, v in pairs(default_span) do + span[k] = v + end + local spans_http_response_body_size = thread:get("Spans_http_response_body_size") + local spans_start_time = thread:get("Spans_start_time") + local spans_finish_time = thread:get("Spans_finish_time") + local spans_status_code = thread:get("Spans_status_code") + local spans_body = thread:get("Spans_body") + local spans_http_request_body_size = thread:get("Spans_http_request_body_size") + local spans_traceparent = thread:get("Spans_traceparent") + local spans_http_request_method = thread:get("Spans_http_request_method") + span["http.response.status_code"] = spans_status_code[i] + span["http.response.body_size"] = spans_http_response_body_size[i] + span["start_time"] = spans_start_time[i] + span["finish_time"] = spans_finish_time[i] + span["_body"] = spans_body[i] + span["http.request.body.size"] = spans_http_request_body_size[i] + span["traceparent"] = spans_traceparent[i] + span["http.request.method"] = spans_http_request_method[i] + spans[i] = span end + first_span = spans[1] + span_threads[j] = spans end + if not trace_id then error("No Trace_id found in threads") end io.flush() print("Here") io.flush() - print(first_span) + for k, v in pairs(first_span) do + print(">>", k, v) + end io.flush() -- pairs iteration order is undefined, store headers in a table with numeric index local csv_header = {} local hi = 0 for k, _ in pairs(first_span) do - print("Here2") - io.flush() csv_header[hi] = k hi = hi + 1 end @@ -170,7 +191,7 @@ done = function(_, _, _) } ) - for thread in threads do + for i, thread in ipairs(threads) do local Time_start = thread:get("Time_start") local ThreadId = thread:get("ThreadId") print_csv_line(csv_header, @@ -184,9 +205,8 @@ done = function(_, _, _) thread = ThreadId }) - local spans = thread:get("Spans") - - for span in spans do + local spans = span_threads[i] + for _, span in ipairs(spans) do print_csv_line(csv_header, span) end end @@ -196,9 +216,6 @@ done = function(_, _, _) end end ----@type table -Spans = {} - ---@type table local Requests = {} @@ -213,20 +230,48 @@ local User_agent = 'wrk' ---@type nil|string local Rpc_system +---@type table +Spans_http_response_body_size = {} + +---@type table +Spans_start_time = {} + +---@type table +Spans_finish_time = {} + +--@type table +Spans_status_code = {} + +---@type table +Spans_body = {} + +--@type table +Spans_http_request_body_size = {} + +--@type table +Spans_traceparent = {} + +--@type table +Spans_http_request_method = {} + +--@type table +Default_span = {} + + ---@param n number ---@param status_code number ---@param response_body nil|string ---@param timestamp number local set_response = function(n, status_code, response_body, timestamp) - local span = Spans[n] if response_body then - span["http.response.body.size"] = #response_body + Spans_http_response_body_size[n] = #response_body if status_code >= 400 then -- will decode in python/ocaml post-processing code - span["_body"] = response_body + Spans_body[n] = response_body end end - span.finish_time = timestamp + Spans_status_code[n] = status_code + Spans_finish_time[n] = timestamp end ---@param n number @@ -240,33 +285,32 @@ local build_request = function(n, method, path, body) headers['Host'] = wrk.host headers['traceparent'] = traceparent - local span = {} if body then len = #body headers['Content-Length'] = tostring(len) - span["http.request.body.size"] = len + Spans_http_request_body_size[n] = len end - span.traceparent = traceparent + Spans_traceparent[n] = traceparent + Spans_start_time[n] = 0 + Spans_finish_time[n] = 0 - span["http.request.method"] = method - span["user_agent.original"] = User_agent - span["server.address"] = tostring(wrk.thread.addr) + Spans_http_request_method[n] = method + Default_span["user_agent.original"] = User_agent + Default_span["server.address"] = tostring(wrk.thread.addr) local portstr = "" if wrk.port then - span["server.port"] = wrk.port + Default_span["server.port"] = wrk.port portstr = string.format(":%d", wrk.port) end - span["url.full"] = string.format("%s://%s%s%s", wrk.scheme, wrk.host, portstr, wrk.path) + Default_span["url.full"] = string.format("%s://%s%s%s", wrk.scheme, wrk.host, portstr, wrk.path) if Rpc_system then - span["rpc.system"] = Rpc_system + Default_span["rpc.system"] = Rpc_system if Rpc_system == "jsonrpc" then - span["rpc.jsonrpc.request_id"] = n - span["rpc.jsonrpc.version"] = "1.0" + Default_span["rpc.jsonrpc.request_id"] = n + Default_span["rpc.jsonrpc.version"] = "1.0" end - span["rpc.service"] = "xapi" + Default_span["rpc.service"] = "xapi" end - span.start_time = 0 - Spans[n] = span -- preallocate response set_response(n, 0, nil, 0) @@ -313,7 +357,7 @@ init = function(args) end end else - for i in 1, 99 do + for i = 1, 99 do build_request(i, "GET", "/", nil) end end @@ -326,10 +370,10 @@ end -- all requests should be pre-created in init as needed request = function() - if Request_counter > #Spans then + if Request_counter > #Spans_start_time then wrk.thread:stop() end - Spans[Request_counter].start_time = seconds(clock_gettime()) + Spans_start_time[Request_counter] = seconds(clock_gettime()) local packet = Requests[Request_counter] Request_counter = Request_counter + 1 return packet