Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added simple, configurable rate limit for lightpush and store-query #2390

Merged
merged 4 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions tests/waku_lightpush/lightpush_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import
../testlib/[common, wakucore]

proc newTestWakuLightpushNode*(
switch: Switch, handler: PushMessageHandler
switch: Switch,
handler: PushMessageHandler,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): Future[WakuLightPush] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuLightPush.new(peerManager, rng, handler)
proto = WakuLightPush.new(peerManager, rng, handler, rateLimitSetting)

await proto.start()
switch.mount(proto)
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_lightpush/test_all.nim
Original file line number Diff line number Diff line change
@@ -1 +1 @@
import ./test_client
import ./test_client, ./test_ratelimit
2 changes: 1 addition & 1 deletion tests/waku_lightpush/test_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ suite "Waku Lightpush Client":
# 1KiB
message2 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(10 * 1024)
) # 10KiB
) # 10KiB
message3 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(100 * 1024)
) # 100KiB
Expand Down
151 changes: 151 additions & 0 deletions tests/waku_lightpush/test_ratelimit.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
{.used.}

import
std/[options, strscans],
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto

import
../../waku/[
node/peer_manager,
common/ratelimit,
waku_core,
waku_lightpush,
waku_lightpush/client,
waku_lightpush/common,
waku_lightpush/protocol_metrics,
waku_lightpush/rpc,
waku_lightpush/rpc_codec,
],
../testlib/[assertions, wakucore, testasync, futures, testutils],
./lightpush_utils,
../resources/[pubsub_topics, content_topics, payloads]

suite "Rate limited push service":
asyncTest "push message with rate limit not violated":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()

await allFutures(serverSwitch.start(), clientSwitch.start())

## Given
var handlerFuture = newFuture[(string, WakuMessage)]()
let handler: PushMessageHandler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()

let
tokenPeriod = 500.millis
server =
await newTestWakuLightpushNode(serverSwitch, handler, some((3, tokenPeriod)))
client = newTestWakuLightpushClient(clientSwitch)

let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()

let sendMsgProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()

handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)

check await handlerFuture.withTimeout(50.millis)

assert requestRes.isOk(), requestRes.error
check handlerFuture.finished()

let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()

check:
handledMessagePubsubTopic == DefaultPubsubTopic
handledMessage == message

let waitInBetweenFor = 20.millis

# Test cannot be too explicit about the time when the TokenBucket resets
# the internal timer, although in normal use there is no use case to care about it.
var firstWaitExtend = 300.millis

for runCnt in 0 ..< 3:
let startTime = Moment.now()
for testCnt in 0 ..< 3:
await sendMsgProc()
await sleepAsync(20.millis)

var endTime = Moment.now()
var elapsed: Duration = (endTime - startTime)
await sleepAsync(tokenPeriod - elapsed + firstWaitExtend)
firstWaitEXtend = 100.millis

## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())

asyncTest "push message with rate limit reject":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()

await allFutures(serverSwitch.start(), clientSwitch.start())

## Given
var handlerFuture = newFuture[(string, WakuMessage)]()
let handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()

let
server =
await newTestWakuLightpushNode(serverSwitch, handler, some((3, 500.millis)))
client = newTestWakuLightpushClient(clientSwitch)

let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
let topic = DefaultPubsubTopic

let successProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
discard await handlerFuture.withTimeout(10.millis)

check:
requestRes.isOk()
handlerFuture.finished()
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
check:
handledMessagePubsubTopic == DefaultPubsubTopic
handledMessage == message

let rejectProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
discard await handlerFuture.withTimeout(10.millis)

check:
requestRes.isErr()
requestRes.error == "TOO_MANY_REQUESTS"

for testCnt in 0 .. 2:
await successProc()
await sleepAsync(20.millis)

await rejectProc()

await sleepAsync(500.millis)

## next one shall succeed due to the rate limit time window has passed
await successProc()

## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
98 changes: 95 additions & 3 deletions tests/waku_store/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,7 @@ procSuite "WakuNode - Store":

server.wakuFilterClient.registerPushHandler(filterHandler)
let resp = waitFor server.filterSubscribe(
some(DefaultPubsubTopic),
DefaultContentTopic,
peer = filterSourcePeer,
some(DefaultPubsubTopic), DefaultContentTopic, peer = filterSourcePeer
)

waitFor sleepAsync(100.millis)
Expand Down Expand Up @@ -319,3 +317,97 @@ procSuite "WakuNode - Store":

# Cleanup
waitFor allFutures(client.stop(), server.stop())

test "Store protocol queries does not violate request rate limitation":
## Setup
let
serverKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
clientKey = generateSecp256k1Key()
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))

waitFor allFutures(client.start(), server.start())

let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error

waitFor server.mountStore((4, 500.millis))

client.mountStoreClient()

## Given
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()

let requestProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)

assert queryRes.isOk(), queryRes.error

let response = queryRes.get()
check:
response.messages == msgListA

for count in 0 ..< 4:
waitFor requestProc()
waitFor sleepAsync(20.millis)

waitFor sleepAsync(500.millis)

for count in 0 ..< 4:
waitFor requestProc()
waitFor sleepAsync(20.millis)

# Cleanup
waitFor allFutures(client.stop(), server.stop())

test "Store protocol queries overrun request rate limitation":
## Setup
let
serverKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
clientKey = generateSecp256k1Key()
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))

waitFor allFutures(client.start(), server.start())

let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error

waitFor server.mountStore((3, 500.millis))

client.mountStoreClient()

## Given
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()

let successProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)

check queryRes.isOk()

let response = queryRes.get()
check:
response.messages == msgListA

let failsProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)

check queryRes.isErr()
check queryRes.error == "TOO_MANY_REQUESTS"

for count in 0 ..< 3:
waitFor successProc()
waitFor sleepAsync(20.millis)

waitFor failsProc()

waitFor sleepAsync(500.millis)

for count in 0 ..< 3:
waitFor successProc()
waitFor sleepAsync(20.millis)

# Cleanup
waitFor allFutures(client.stop(), server.stop())
78 changes: 75 additions & 3 deletions tests/wakunode_rest/test_rest_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import
../../waku/waku_api/rest/lightpush/handlers as lightpush_api,
../../waku/waku_api/rest/lightpush/client as lightpush_api_client,
../../waku/waku_relay,
../../waku/common/ratelimit,
../testlib/wakucore,
../testlib/wakunode
../testlib/wakunode,
../testlib/testutils

proc testWakuNode(): WakuNode =
let
Expand All @@ -41,7 +43,9 @@ type RestLightPushTest = object
restServer: WakuRestServerRef
client: RestClientRef

proc init(T: type RestLightPushTest): Future[T] {.async.} =
proc init(
T: type RestLightPushTest, rateLimit: RateLimitSetting = (0, 0.millis)
): Future[T] {.async.} =
var testSetup = RestLightPushTest()
testSetup.serviceNode = testWakuNode()
testSetup.pushNode = testWakuNode()
Expand All @@ -55,7 +59,7 @@ proc init(T: type RestLightPushTest): Future[T] {.async.} =

await testSetup.consumerNode.mountRelay()
await testSetup.serviceNode.mountRelay()
await testSetup.serviceNode.mountLightPush()
await testSetup.serviceNode.mountLightPush(rateLimit)
testSetup.pushNode.mountLightPushClient()

testSetup.serviceNode.peerManager.addServicePeer(
Expand Down Expand Up @@ -178,6 +182,74 @@ suite "Waku v2 Rest API - lightpush":

await restLightPushTest.shutdown()

# disabled due to this bug in nim-chronos https://github.com/status-im/nim-chronos/issues/500
xasyncTest "Request rate limit push message":
# Given
let budgetCap = 3
let tokenPeriod = 500.millis
let restLightPushTest = await RestLightPushTest.init((budgetCap, tokenPeriod))

restLightPushTest.consumerNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1

# When
let pushProc = proc() {.async.} =
let message: RelayWakuMessage = fakeWakuMessage(
contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")
)
.toRelayWakuMessage()

let requestBody =
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message)
let response = await restLightPushTest.client.sendPushRequest(requestBody)

echo "response", $response

# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT

let pushRejectedProc = proc() {.async.} =
let message: RelayWakuMessage = fakeWakuMessage(
contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")
)
.toRelayWakuMessage()

let requestBody =
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message)
let response = await restLightPushTest.client.sendPushRequest(requestBody)

echo "response", $response

# Then
check:
response.status == 429

await pushProc()
await pushProc()
await pushProc()
await pushRejectedProc()

await sleepAsync(tokenPeriod)

for runCnt in 0 ..< 3:
let startTime = Moment.now()
for sendCnt in 0 ..< budgetCap:
await pushProc()

let endTime = Moment.now()
let elapsed: Duration = (endTime - startTime)
await sleepAsync(tokenPeriod - elapsed)

await restLightPushTest.shutdown()

## TODO: Re-work this test when lightpush protocol change is done: https://github.com/waku-org/pm/issues/93
## This test is similar when no available peer exists for publish. Currently it is returning success,
## that makes this test not useful.
Expand Down
Loading
Loading