Skip to content

Commit

Permalink
feat: running periodicaly peer exchange if discv5 is disabled (#3150)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer authored Oct 30, 2024
1 parent b3656d6 commit 400d7a5
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 11 deletions.
2 changes: 1 addition & 1 deletion tests/test_waku_keepalive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ suite "Waku Keepalive":

await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])

node1.startKeepalive()
node1.startKeepalive(2.seconds)

check:
(await completionFut.withTimeout(5.seconds)) == true
Expand Down
6 changes: 5 additions & 1 deletion waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,11 @@ proc startNode*(
desiredOutDegree = node.wakuRelay.parameters.d.uint64()
(await node.fetchPeerExchangePeers(desiredOutDegree)).isOkOr:
error "error while fetching peers from peer exchange", error = error
quit(QuitFailure)

# Use px to periodically get peers if discv5 is disabled, as discv5 nodes have their own
# periodic loop to find peers and px returned peers actually come from discv5
if conf.peerExchange and not conf.discv5Discovery:
node.startPeerExchangeLoop()

# Start keepalive, if enabled
if conf.keepAlive:
Expand Down
36 changes: 27 additions & 9 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@ proc mountPeerExchange*(
error "failed to mount wakuPeerExchange", error = getCurrentExceptionMsg()

proc fetchPeerExchangePeers*(
node: Wakunode, amount: uint64
node: Wakunode, amount = DefaultPXNumPeersReq
): Future[Result[int, PeerExchangeResponseStatus]] {.async: (raises: []).} =
if node.wakuPeerExchange.isNil():
error "could not get peers from px, waku peer-exchange is nil"
Expand Down Expand Up @@ -1175,6 +1175,20 @@ proc fetchPeerExchangePeers*(
error = pxPeersRes.error
return err(pxPeersRes.error)

proc peerExchangeLoop(node: WakuNode) {.async.} =
while true:
await sleepAsync(1.minutes)
if not node.started:
continue
(await node.fetchPeerExchangePeers()).isOkOr:
warn "Cannot fetch peers from peer exchange", cause = error

proc startPeerExchangeLoop*(node: WakuNode) =
if node.wakuPeerExchange.isNil():
error "startPeerExchangeLoop: Peer Exchange is not mounted"
return
node.wakuPeerExchange.pxLoopHandle = node.peerExchangeLoop()

# TODO: Move to application module (e.g., wakunode2.nim)
proc setPeerExchangePeer*(
node: WakuNode, peer: RemotePeerInfo | MultiAddress | string
Expand Down Expand Up @@ -1217,7 +1231,11 @@ proc mountLibp2pPing*(node: WakuNode) {.async: (raises: []).} =

# TODO: Move this logic to PeerManager
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
while node.started:
while true:
await sleepAsync(keepalive)
if not node.started:
continue

# Keep connected peers alive while running
# Each node is responsible of keeping its outgoing connections alive
trace "Running keepalive"
Expand All @@ -1235,14 +1253,11 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
except CatchableError as exc:
waku_node_errors.inc(labelValues = ["keep_alive_failure"])

await sleepAsync(keepalive)

proc startKeepalive*(node: WakuNode) =
let defaultKeepalive = 2.minutes # 20% of the default chronosstream timeout duration
# 2 minutes default - 20% of the default chronosstream timeout duration
proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) =
info "starting keepalive", keepalive = keepalive

info "starting keepalive", keepalive = defaultKeepalive

asyncSpawn node.keepaliveLoop(defaultKeepalive)
asyncSpawn node.keepaliveLoop(keepalive)

proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} =
info "mounting rendezvous discovery protocol"
Expand Down Expand Up @@ -1372,6 +1387,9 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.stopWait()

if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil():
await node.wakuPeerExchange.pxLoopHandle.cancelAndWait()

node.started = false

proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} =
Expand Down
1 change: 1 addition & 0 deletions waku/waku_peer_exchange/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type
cluster*: Option[uint16]
# todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
requestRateLimiter*: RequestRateLimiter
pxLoopHandle*: Future[void]

proc request*(
wpx: WakuPeerExchange, numPeers = DefaultPXNumPeersReq, conn: Connection
Expand Down

0 comments on commit 400d7a5

Please sign in to comment.