Skip to content

Commit

Permalink
rlqs: Implement RLQS stream restarts if the stream goes down mid-use. (
Browse files Browse the repository at this point in the history
…envoyproxy#36170)

Commit Message: Implement RLQS stream restarts if the stream goes down
mid-use.
Additional Description: Stream restarts are done during periodic usage
reporting, which limits retry spam while backends are offline.
Risk Level:
Testing: Integration testing updated to exercise the filter before and
after stream closure.
Docs Changes:
Release Notes:
Platform Specific Features:

---------

Signed-off-by: Brian Surber <[email protected]>
  • Loading branch information
bsurber authored Sep 17, 2024
1 parent a1e12cd commit cea046f
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 49 deletions.
2 changes: 1 addition & 1 deletion source/extensions/filters/http/rate_limit_quota/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class RateLimitClient {
public:
virtual ~RateLimitClient() = default;

virtual absl::Status startStream(const StreamInfo::StreamInfo& stream_info) PURE;
virtual absl::Status startStream(const StreamInfo::StreamInfo* stream_info) PURE;
virtual void closeStream() PURE;
virtual void sendUsageReport(absl::optional<size_t> bucket_id) PURE;

Expand Down
36 changes: 23 additions & 13 deletions source/extensions/filters/http/rate_limit_quota/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,17 @@ RateLimitQuotaUsageReports RateLimitClientImpl::buildReport(absl::optional<size_
// This function covers both periodical report and immediate report case, with the difference that
// bucked id in periodical report case is empty.
void RateLimitClientImpl::sendUsageReport(absl::optional<size_t> bucket_id) {
if (stream_ != nullptr) {
// Build the report and then send the report to RLQS server.
// `end_stream` should always be set to false as we don't want to close the stream locally.
stream_->sendMessage(buildReport(bucket_id), /*end_stream=*/false);
} else {
// Don't send any reports if stream has already been closed.
ENVOY_LOG(debug, "The stream has already been closed; no reports will be sent.");
if (stream_ == nullptr) {
ENVOY_LOG(debug, "The RLQS stream has been closed and must be restarted to send reports.");
if (absl::Status err = startStream(nullptr); !err.ok()) {
ENVOY_LOG(error, "Failed to start the stream to send reports.");
return;
}
}

// Build the report and then send the report to RLQS server.
// `end_stream` should always be set to false as we don't want to close the stream locally.
stream_->sendMessage(buildReport(bucket_id), /*end_stream=*/false);
}

void RateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response) {
Expand Down Expand Up @@ -165,20 +168,27 @@ void RateLimitClientImpl::onRemoteClose(Grpc::Status::GrpcStatus status,
stream_ = nullptr;
}

absl::Status RateLimitClientImpl::startStream(const StreamInfo::StreamInfo& stream_info) {
absl::Status RateLimitClientImpl::startStream(const StreamInfo::StreamInfo* stream_info) {
// Starts stream if it has not been opened yet.
if (stream_ == nullptr) {
ENVOY_LOG(debug, "Trying to start the new gRPC stream");
auto stream_options = Http::AsyncClient::RequestOptions();
if (stream_info) {
stream_options.setParentContext(Http::AsyncClient::ParentContext{stream_info});
}
stream_ = aync_client_.start(
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.rate_limit_quota.v3.RateLimitQuotaService.StreamRateLimitQuotas"),
*this,
Http::AsyncClient::RequestOptions().setParentContext(
Http::AsyncClient::ParentContext{&stream_info}));
*this, stream_options);
}

// If still null after attempting a start.
if (stream_ == nullptr) {
return absl::InternalError("Failed to start the stream");
}

// Returns error status if start failed (i.e., stream_ is nullptr).
return stream_ == nullptr ? absl::InternalError("Failed to start the stream") : absl::OkStatus();
ENVOY_LOG(debug, "gRPC stream has been started");
return absl::OkStatus();
}

} // namespace RateLimitQuota
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class RateLimitClientImpl : public RateLimitClient,
void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;

// RateLimitClient methods.
absl::Status startStream(const StreamInfo::StreamInfo& stream_info) override;
absl::Status startStream(const StreamInfo::StreamInfo* stream_info) override;
void closeStream() override;
// Send the usage report to RLQS server
void sendUsageReport(absl::optional<size_t> bucket_id) override;
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/rate_limit_quota/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ RateLimitQuotaFilter::sendImmediateReport(const size_t bucket_id,

// Start the streaming on the first request.
// It will be a no-op if the stream is already active.
auto status = client_.rate_limit_client->startStream(callbacks_->streamInfo());
auto status = client_.rate_limit_client->startStream(&callbacks_->streamInfo());
if (!status.ok()) {
ENVOY_LOG(error, "Failed to start the gRPC stream: ", status.message());
// TODO(tyxia) Check `NoAssignmentBehavior` behavior instead of fail-open here.
Expand Down
33 changes: 30 additions & 3 deletions test/extensions/filters/http/rate_limit_quota/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RateLimitClientTest : public testing::Test {
};

TEST_F(RateLimitClientTest, OpenAndCloseStream) {
EXPECT_OK(test_client.client_->startStream(test_client.stream_info_));
EXPECT_OK(test_client.client_->startStream(&test_client.stream_info_));
EXPECT_CALL(test_client.stream_, closeStream());
EXPECT_CALL(test_client.stream_, resetStream());
test_client.client_->closeStream();
Expand All @@ -27,7 +27,7 @@ TEST_F(RateLimitClientTest, OpenAndCloseStream) {
TEST_F(RateLimitClientTest, SendUsageReport) {
::envoy::service::rate_limit_quota::v3::BucketId bucket_id;
TestUtility::loadFromYaml(SingleBukcetId, bucket_id);
EXPECT_OK(test_client.client_->startStream(test_client.stream_info_));
EXPECT_OK(test_client.client_->startStream(&test_client.stream_info_));
bool end_stream = false;
// Send quota usage report and ensure that we get it.
EXPECT_CALL(test_client.stream_, sendMessageRaw_(_, end_stream));
Expand All @@ -39,7 +39,7 @@ TEST_F(RateLimitClientTest, SendUsageReport) {
}

TEST_F(RateLimitClientTest, SendRequestAndReceiveResponse) {
EXPECT_OK(test_client.client_->startStream(test_client.stream_info_));
EXPECT_OK(test_client.client_->startStream(&test_client.stream_info_));
ASSERT_NE(test_client.stream_callbacks_, nullptr);

auto empty_request_headers = Http::RequestHeaderMapImpl::create();
Expand All @@ -66,6 +66,33 @@ TEST_F(RateLimitClientTest, SendRequestAndReceiveResponse) {
test_client.client_->onRemoteClose(0, "");
}

TEST_F(RateLimitClientTest, RestartStreamWhileInUse) {
::envoy::service::rate_limit_quota::v3::BucketId bucket_id;
TestUtility::loadFromYaml(SingleBukcetId, bucket_id);
EXPECT_OK(test_client.client_->startStream(&test_client.stream_info_));

bool end_stream = false;
// Send quota usage report and ensure that we get it.
EXPECT_CALL(test_client.stream_, sendMessageRaw_(_, end_stream));
const size_t bucket_id_hash = MessageUtil::hash(bucket_id);
test_client.client_->sendUsageReport(bucket_id_hash);
EXPECT_CALL(test_client.stream_, closeStream());
EXPECT_CALL(test_client.stream_, resetStream());
test_client.client_->closeStream();

// Expect the stream to reopen while trying to send the next usage report.
EXPECT_CALL(test_client.stream_, sendMessageRaw_(_, end_stream));
test_client.client_->sendUsageReport(bucket_id_hash);
EXPECT_CALL(test_client.stream_, closeStream());
EXPECT_CALL(test_client.stream_, resetStream());
test_client.client_->closeStream();

// Expect the client to handle a restart failure.
EXPECT_CALL(*test_client.async_client_, startRaw(_, _, _, _)).WillOnce(testing::Return(nullptr));
WAIT_FOR_LOG_CONTAINS("error", "Failed to start the stream to send reports.",
{ test_client.client_->sendUsageReport(bucket_id_hash); });
}

} // namespace
} // namespace RateLimitQuota
} // namespace HttpFilters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ class RateLimitTestClient {
}

Grpc::RawAsyncClientSharedPtr mockCreateAsyncClient(Unused, Unused, Unused) {
auto async_client = std::make_shared<Grpc::MockAsyncClient>();
EXPECT_CALL(*async_client, startRaw("envoy.service.rate_limit_quota.v3.RateLimitQuotaService",
"StreamRateLimitQuotas", _, _))
.WillOnce(Invoke(this, &RateLimitTestClient::mockStartRaw));
async_client_ = std::make_shared<Grpc::MockAsyncClient>();
EXPECT_CALL(*async_client_, startRaw("envoy.service.rate_limit_quota.v3.RateLimitQuotaService",
"StreamRateLimitQuotas", _, _))
.WillRepeatedly(Invoke(this, &RateLimitTestClient::mockStartRaw));

return async_client;
return async_client_;
}

Grpc::RawAsyncStream* mockStartRaw(Unused, Unused, Grpc::RawAsyncStreamCallbacks& callbacks,
Expand All @@ -97,7 +97,7 @@ class RateLimitTestClient {
Grpc::RawAsyncStreamCallbacks* stream_callbacks_;
Grpc::Status::GrpcStatus grpc_status_ = Grpc::Status::WellKnownGrpcStatus::Ok;
RateLimitClientPtr client_;
// std::unique_ptr<RateLimitClient> client_;
std::shared_ptr<Grpc::MockAsyncClient> async_client_ = nullptr;
MockRateLimitQuotaCallbacks callbacks_;
bool external_inited_ = false;
bool start_failed_ = false;
Expand Down
55 changes: 32 additions & 23 deletions test/extensions/filters/http/rate_limit_quota/integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -787,42 +787,51 @@ TEST_P(RateLimitQuotaIntegrationTest, BasicFlowPeriodicalReportWithStreamClosed)
EXPECT_TRUE(response_->complete());
EXPECT_EQ(response_->headers().getStatusValue(), "200");

// ValidMatcherConfig.
int report_interval_sec = 60;
// Trigger the report periodically.
for (int i = 0; i < 6; ++i) {
if (i == 2) {
// Close the stream.
rlqs_stream_->finishGrpcStream(Grpc::Status::Ok);
WAIT_FOR_LOG_CONTAINS("debug", "gRPC stream closed remotely with status",
{ rlqs_stream_->finishGrpcStream(Grpc::Status::Canceled); });
ASSERT_TRUE(rlqs_stream_->waitForReset());
}

// Advance the time by report_interval.
simTime().advanceTimeWait(std::chrono::milliseconds(report_interval_sec * 1000));

// Only perform rlqs server check and response before stream is remotely closed.
if (i < 2) {
// Checks that the rate limit server has received the periodical reports.
ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports));
if (i == 2) {
// Stream should be restarted when next required for usage reporting.
ASSERT_TRUE(rlqs_connection_->waitForNewStream(*dispatcher_, rlqs_stream_));
rlqs_stream_->startGrpcStream();
}

// Verify the usage report content.
ASSERT_THAT(reports.bucket_quota_usages_size(), 1);
const auto& usage = reports.bucket_quota_usages(0);
// Report only represents the usage since last report.
// In the periodical report case here, the number of request allowed and denied is 0 since no
// new requests comes in.
EXPECT_EQ(usage.num_requests_allowed(), 0);
EXPECT_EQ(usage.num_requests_denied(), 0);
// time_elapsed equals to periodical reporting interval.
EXPECT_EQ(Protobuf::util::TimeUtil::DurationToSeconds(usage.time_elapsed()),
report_interval_sec);
// Only perform rlqs server check and response before stream is remotely
// closed. Checks that the rate limit server has received the periodical
// reports.
ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports));

// Verify the usage report content.
ASSERT_THAT(reports.bucket_quota_usages_size(), 1);
const auto& usage = reports.bucket_quota_usages(0);
// Report only represents the usage since last report.
// In the periodical report case here, the number of request allowed and
// denied is 0 since no new requests comes in.
EXPECT_EQ(usage.num_requests_allowed(), 0);
EXPECT_EQ(usage.num_requests_denied(), 0);
// time_elapsed equals to periodical reporting interval.
EXPECT_EQ(Protobuf::util::TimeUtil::DurationToSeconds(usage.time_elapsed()),
report_interval_sec);

// Build the rlqs server response.
envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response2;
auto* bucket_action2 = rlqs_response2.add_bucket_action();
// Build the rlqs server response.
envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response2;
auto* bucket_action2 = rlqs_response2.add_bucket_action();

for (const auto& [key, value] : custom_headers_cpy) {
(*bucket_action2->mutable_bucket_id()->mutable_bucket()).insert({key, value});
}
rlqs_stream_->sendGrpcMessage(rlqs_response2);
for (const auto& [key, value] : custom_headers_cpy) {
(*bucket_action2->mutable_bucket_id()->mutable_bucket()).insert({key, value});
}
rlqs_stream_->sendGrpcMessage(rlqs_response2);
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/extensions/filters/http/rate_limit_quota/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class MockRateLimitClient : public RateLimitClient {
MockRateLimitClient() = default;
~MockRateLimitClient() override = default;

MOCK_METHOD(absl::Status, startStream, (const StreamInfo::StreamInfo&));
MOCK_METHOD(absl::Status, startStream, (const StreamInfo::StreamInfo*));
MOCK_METHOD(void, closeStream, ());
MOCK_METHOD(void, sendUsageReport, (absl::optional<size_t>));

Expand Down

0 comments on commit cea046f

Please sign in to comment.