Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
edwintorok committed Jul 11, 2023
1 parent b975fe4 commit 6e47e4a
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 44 deletions.
3 changes: 2 additions & 1 deletion ocaml/loadgen/speculative.ml
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ let build_conntable connections =
connections ;
conntbl

let repeat = 100
let repeat = 1
(* TODO: have pipeline and nonpipeline mode so we can have multiple persistent but not pipelined requests *)

let nconn = 100

Expand Down
130 changes: 87 additions & 43 deletions ocaml/loadgen/time.lua
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ end
---@param str string
---@return string
local csv_escape = function(str)
return '"' .. string.gsub(str, '"', '""') .. '""'
return '"' .. string.gsub(str, '"', '""') .. '"'
end

---prints a CSV line with fields ordered according to [csv_header]
Expand All @@ -85,7 +85,7 @@ local print_csv_line = function(csv_header, data)
line[i] = ''
end
end
io.write(table.concat(line), '\n')
io.write(table.concat(line, ','), '\n')
end

---@param trace_id string
Expand Down Expand Up @@ -123,33 +123,54 @@ done = function(_, _, _)
if id then
trace_id = id
end

io.flush()
print("getting spans")
io.flush()
local spans = thread:get("Spans")
print("got spans")
io.flush()
if spans then
---@cast spans table
first_span = spans
end
local span_threads = {}
for j, thread in ipairs(threads) do
local n = thread:get("Response_counter")
local spans = {}
local default_span = thread:get("Default_span")
for i = 1, n do
local span = {}
for k, v in pairs(default_span) do
span[k] = v
end
local spans_http_response_body_size = thread:get("Spans_http_response_body_size")
local spans_start_time = thread:get("Spans_start_time")
local spans_finish_time = thread:get("Spans_finish_time")
local spans_status_code = thread:get("Spans_status_code")
local spans_body = thread:get("Spans_body")
local spans_http_request_body_size = thread:get("Spans_http_request_body_size")
local spans_traceparent = thread:get("Spans_traceparent")
local spans_http_request_method = thread:get("Spans_http_request_method")
span["http.response.status_code"] = spans_status_code[i]
span["http.response.body_size"] = spans_http_response_body_size[i]
span["start_time"] = spans_start_time[i]
span["finish_time"] = spans_finish_time[i]
span["_body"] = spans_body[i]
span["http.request.body.size"] = spans_http_request_body_size[i]
span["traceparent"] = spans_traceparent[i]
span["http.request.method"] = spans_http_request_method[i]
spans[i] = span
end
first_span = spans[1]
span_threads[j] = spans
end

if not trace_id then
error("No Trace_id found in threads")
end
io.flush()
print("Here")
io.flush()
print(first_span)
for k, v in pairs(first_span) do
print(">>", k, v)
end
io.flush()

-- pairs iteration order is undefined, store headers in a table with numeric index
local csv_header = {}
local hi = 0
for k, _ in pairs(first_span) do
print("Here2")
io.flush()
csv_header[hi] = k
hi = hi + 1
end
Expand All @@ -170,7 +191,7 @@ done = function(_, _, _)
}
)

for thread in threads do
for i, thread in ipairs(threads) do
local Time_start = thread:get("Time_start")
local ThreadId = thread:get("ThreadId")
print_csv_line(csv_header,
Expand All @@ -184,9 +205,8 @@ done = function(_, _, _)
thread = ThreadId
})

local spans = thread:get("Spans")

for span in spans do
local spans = span_threads[i]
for _, span in ipairs(spans) do
print_csv_line(csv_header, span)
end
end
Expand All @@ -196,9 +216,6 @@ done = function(_, _, _)
end
end

---@type table<number, table>
Spans = {}

---@type table<number, string>
local Requests = {}

Expand All @@ -213,20 +230,48 @@ local User_agent = 'wrk'
---@type nil|string
local Rpc_system

---@type table<number, number>
Spans_http_response_body_size = {}

---@type table<number, number>
Spans_start_time = {}

---@type table<number, number>
Spans_finish_time = {}

--@type table<number, number>
Spans_status_code = {}

---@type table<number, string>
Spans_body = {}

--@type table<number, int>
Spans_http_request_body_size = {}

--@type table<number, string>
Spans_traceparent = {}

--@type table<number, method>
Spans_http_request_method = {}

--@type table<string, string>
Default_span = {}


---@param n number
---@param status_code number
---@param response_body nil|string
---@param timestamp number
local set_response = function(n, status_code, response_body, timestamp)
local span = Spans[n]
if response_body then
span["http.response.body.size"] = #response_body
Spans_http_response_body_size[n] = #response_body
if status_code >= 400 then
-- will decode in python/ocaml post-processing code
span["_body"] = response_body
Spans_body[n] = response_body
end
end
span.finish_time = timestamp
Spans_status_code[n] = status_code
Spans_finish_time[n] = timestamp
end

---@param n number
Expand All @@ -240,33 +285,32 @@ local build_request = function(n, method, path, body)

headers['Host'] = wrk.host
headers['traceparent'] = traceparent
local span = {}
if body then
len = #body
headers['Content-Length'] = tostring(len)
span["http.request.body.size"] = len
Spans_http_request_body_size[n] = len
end
span.traceparent = traceparent
Spans_traceparent[n] = traceparent
Spans_start_time[n] = 0
Spans_finish_time[n] = 0

span["http.request.method"] = method
span["user_agent.original"] = User_agent
span["server.address"] = tostring(wrk.thread.addr)
Spans_http_request_method[n] = method
Default_span["user_agent.original"] = User_agent
Default_span["server.address"] = tostring(wrk.thread.addr)
local portstr = ""
if wrk.port then
span["server.port"] = wrk.port
Default_span["server.port"] = wrk.port
portstr = string.format(":%d", wrk.port)
end
span["url.full"] = string.format("%s://%s%s%s", wrk.scheme, wrk.host, portstr, wrk.path)
Default_span["url.full"] = string.format("%s://%s%s%s", wrk.scheme, wrk.host, portstr, wrk.path)
if Rpc_system then
span["rpc.system"] = Rpc_system
Default_span["rpc.system"] = Rpc_system
if Rpc_system == "jsonrpc" then
span["rpc.jsonrpc.request_id"] = n
span["rpc.jsonrpc.version"] = "1.0"
Default_span["rpc.jsonrpc.request_id"] = n
Default_span["rpc.jsonrpc.version"] = "1.0"
end
span["rpc.service"] = "xapi"
Default_span["rpc.service"] = "xapi"
end
span.start_time = 0
Spans[n] = span
-- preallocate response
set_response(n, 0, nil, 0)

Expand Down Expand Up @@ -313,7 +357,7 @@ init = function(args)
end
end
else
for i in 1, 99 do
for i = 1, 99 do
build_request(i, "GET", "/", nil)
end
end
Expand All @@ -326,10 +370,10 @@ end
-- all requests should be pre-created in init as needed

request = function()
if Request_counter > #Spans then
if Request_counter > #Spans_start_time then
wrk.thread:stop()
end
Spans[Request_counter].start_time = seconds(clock_gettime())
Spans_start_time[Request_counter] = seconds(clock_gettime())
local packet = Requests[Request_counter]
Request_counter = Request_counter + 1
return packet
Expand Down

0 comments on commit 6e47e4a

Please sign in to comment.