Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: running periodicaly peer exchange if discv5 is disabled #3150

Merged
merged 9 commits into from
Oct 30, 2024
Merged
2 changes: 1 addition & 1 deletion tests/test_waku_keepalive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ suite "Waku Keepalive":

await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])

node1.startKeepalive()
node1.startKeepalive(2.seconds)

check:
(await completionFut.withTimeout(5.seconds)) == true
Expand Down
6 changes: 5 additions & 1 deletion waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,11 @@ proc startNode*(
desiredOutDegree = node.wakuRelay.parameters.d.uint64()
(await node.fetchPeerExchangePeers(desiredOutDegree)).isOkOr:
error "error while fetching peers from peer exchange", error = error
quit(QuitFailure)

# Use px to periodically get peers if discv5 is disabled, as discv5 nodes have their own
# periodic loop to find peers and px returned peers actually come from discv5
if conf.peerExchange and not conf.discv5Discovery:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If have no discv5 but want to use PX, how can we ensure to get some initial PX service peer?
Isn't it conf.peerExchangeNode setting ensure it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested it even without setting peerExchangeNode and worked :))

If there isn't a peerExchangeNode set, then any peer we have in the store with the PX codec is chosen randomly, so we just need to be connected to at least one peer that supports PX

pm.serviceSlots.withValue(proto, serviceSlot):
trace "Got peer from service slots",
peerId = serviceSlot[].peerId, multi = serviceSlot[].addrs[0], protocol = proto
return some(serviceSlot[])
# If not slotted, we select a random peer for the given protocol
if peers.len > 0:
trace "Got peer from peerstore",
peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto
return some(peers[0])
trace "No peer found for protocol", protocol = proto
return none(RemotePeerInfo)

It can be any of our normal bootstrap nodes or a static node also.

Then, the peers we receive via PX if they also support it will enter to the pool from which we choose randomly, so it's just a matter of having a point where to start from, and luckily it must not necessarily be a configured peerExchangeNode :)

node.startPeerExchangeLoop()

# Start keepalive, if enabled
if conf.keepAlive:
Expand Down
36 changes: 27 additions & 9 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@ proc mountPeerExchange*(
error "failed to mount wakuPeerExchange", error = getCurrentExceptionMsg()

proc fetchPeerExchangePeers*(
node: Wakunode, amount: uint64
node: Wakunode, amount = DefaultPXNumPeersReq
): Future[Result[int, PeerExchangeResponseStatus]] {.async: (raises: []).} =
if node.wakuPeerExchange.isNil():
error "could not get peers from px, waku peer-exchange is nil"
Expand Down Expand Up @@ -1175,6 +1175,20 @@ proc fetchPeerExchangePeers*(
error = pxPeersRes.error
return err(pxPeersRes.error)

proc peerExchangeLoop(node: WakuNode) {.async.} =
while true:
await sleepAsync(1.minutes)
if not node.started:
continue
(await node.fetchPeerExchangePeers()).isOkOr:
warn "Cannot fetch peers from peer exchange", cause = error

proc startPeerExchangeLoop*(node: WakuNode) =
if node.wakuPeerExchange.isNil():
error "startPeerExchangeLoop: Peer Exchange is not mounted"
return
node.wakuPeerExchange.pxLoopHandle = node.peerExchangeLoop()

# TODO: Move to application module (e.g., wakunode2.nim)
proc setPeerExchangePeer*(
node: WakuNode, peer: RemotePeerInfo | MultiAddress | string
Expand Down Expand Up @@ -1217,7 +1231,11 @@ proc mountLibp2pPing*(node: WakuNode) {.async: (raises: []).} =

# TODO: Move this logic to PeerManager
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
while node.started:
while true:
gabrielmer marked this conversation as resolved.
Show resolved Hide resolved
await sleepAsync(keepalive)
if not node.started:
continue

# Keep connected peers alive while running
# Each node is responsible of keeping its outgoing connections alive
trace "Running keepalive"
Expand All @@ -1235,14 +1253,11 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
except CatchableError as exc:
waku_node_errors.inc(labelValues = ["keep_alive_failure"])

await sleepAsync(keepalive)

proc startKeepalive*(node: WakuNode) =
let defaultKeepalive = 2.minutes # 20% of the default chronosstream timeout duration
# 2 minutes default - 20% of the default chronosstream timeout duration
proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) =
info "starting keepalive", keepalive = keepalive

info "starting keepalive", keepalive = defaultKeepalive

asyncSpawn node.keepaliveLoop(defaultKeepalive)
asyncSpawn node.keepaliveLoop(keepalive)

proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} =
info "mounting rendezvous discovery protocol"
Expand Down Expand Up @@ -1372,6 +1387,9 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.stopWait()

if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil():
await node.wakuPeerExchange.pxLoopHandle.cancelAndWait()

node.started = false

proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} =
Expand Down
1 change: 1 addition & 0 deletions waku/waku_peer_exchange/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type
cluster*: Option[uint16]
# todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
requestRateLimiter*: RequestRateLimiter
pxLoopHandle*: Future[void]

proc request*(
wpx: WakuPeerExchange, numPeers = DefaultPXNumPeersReq, conn: Connection
Expand Down
Loading