Skip to content

Commit

Permalink
chore: libwaku - better error handling and better waku thread destroy…
Browse files Browse the repository at this point in the history
… handling (#3167)
  • Loading branch information
Ivansete-status authored Nov 8, 2024
1 parent 3cb8ebd commit 294dd03
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 25 deletions.
3 changes: 1 addition & 2 deletions apps/networkmonitor/networkmonitor_metrics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ declarePublicGauge networkmonitor_peer_user_agents,

declarePublicHistogram networkmonitor_peer_ping,
"Histogram tracking ping durations for discovered peers",
buckets =
[10.0, 20.0, 50.0, 100.0, 200.0, 300.0, 500.0, 800.0, 1000.0, 2000.0, Inf]
buckets = [10.0, 20.0, 50.0, 100.0, 200.0, 300.0, 500.0, 800.0, 1000.0, 2000.0, Inf]

declarePublicGauge networkmonitor_peer_count,
"Number of discovered peers", labels = ["connected"]
Expand Down
2 changes: 1 addition & 1 deletion library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ proc waku_destroy(
): cint {.dynlib, exportc.} =
checkLibwakuParams(ctx, callback, userData)

waku_thread.stopWakuThread(ctx).handleRes(callback, userData)
waku_thread.destroyWakuThread(ctx).handleRes(callback, userData)

proc waku_version(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
Expand Down
57 changes: 35 additions & 22 deletions library/waku_thread/waku_thread.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,45 @@ type WakuContext* = object
userData*: pointer
eventCallback*: pointer
eventUserdata*: pointer
running: Atomic[bool] # To control when the thread is running

const git_version* {.strdefine.} = "n/a"
const versionString = "version / git commit hash: " & waku.git_version

# To control when the thread is running
# TODO: this should be part of the context so multiple instances can be executed
var running: Atomic[bool]

proc runWaku(ctx: ptr WakuContext) {.async.} =
## This is the worker body. This runs the Waku node
## and attends library user requests (stop, connect_to, etc.)
info "Starting Waku", version = versionString

var waku: Waku

while running.load == true:
while true:
await ctx.reqSignal.wait()

# Trying to get a request from the libwaku main thread
if ctx.running.load == false:
break

## Trying to get a request from the libwaku requestor thread
var request: ptr InterThreadRequest
let recvOk = ctx.reqChannel.tryRecv(request)
if recvOk == true:
let resultResponse = waitFor InterThreadRequest.process(request, addr waku)
if not recvOk:
error "waku thread could not receive a request"
continue

## Handle the request
let resultResponse = waitFor InterThreadRequest.process(request, addr waku)

## Converting a `Result` into a thread-safe transferable response type
let threadSafeResp = InterThreadResponse.createShared(resultResponse)

## Converting a `Result` into a thread-safe transferable response type
let threadSafeResp = InterThreadResponse.createShared(resultResponse)
## Send the response back to the thread that sent the request
let sentOk = ctx.respChannel.trySend(threadSafeResp)
if not sentOk:
error "could not send a request to the requester thread",
original_request = $request[]

## The error-handling is performed in the main thread
discard ctx.respChannel.trySend(threadSafeResp)
discard ctx.respSignal.fireSync()
let fireRes = ctx.respSignal.fireSync()
if fireRes.isErr():
error "could not fireSync back to requester thread", error = fireRes.error

proc run(ctx: ptr WakuContext) {.thread.} =
## Launch waku worker
Expand All @@ -62,7 +71,7 @@ proc createWakuThread*(): Result[ptr WakuContext, string] =
ctx.respSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create respSignal ThreadSignalPtr")

running.store(true)
ctx.running.store(true)

try:
createThread(ctx.thread, run, ctx)
Expand All @@ -74,15 +83,19 @@ proc createWakuThread*(): Result[ptr WakuContext, string] =

return ok(ctx)

proc stopWakuThread*(ctx: ptr WakuContext): Result[void, string] =
running.store(false)
let fireRes = ctx.reqSignal.fireSync()
if fireRes.isErr():
return err("error in stopWakuThread: " & $fireRes.error)
discard ctx.reqSignal.close()
discard ctx.respSignal.close()
proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] =
ctx.running.store(false)

let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
return err("error in destroyWakuThread: " & $error)
if not signaledOnTime:
return err("failed to signal reqSignal on time in destroyWakuThread")

joinThread(ctx.thread)
?ctx.reqSignal.close()
?ctx.respSignal.close()
freeShared(ctx)

return ok()

proc sendRequestToWakuThread*(
Expand Down

0 comments on commit 294dd03

Please sign in to comment.