From cea046f058c29d727ada8aea92316089e6451694 Mon Sep 17 00:00:00 2001 From: bsurber <73970703+bsurber@users.noreply.github.com> Date: Tue, 17 Sep 2024 12:54:19 -0700 Subject: [PATCH] rlqs: Implement RLQS stream restarts if the stream goes down mid-use. (#36170) Commit Message: Implement RLQS stream restarts if the stream goes down mid-use. Additional Description: Stream restarts are done during periodic usage reporting, which limits retry spam while backends are offline. Risk Level: Testing: Integration testing updated to exercise the filter before and after stream closure. Docs Changes: Release Notes: Platform Specific Features: --------- Signed-off-by: Brian Surber --- .../filters/http/rate_limit_quota/client.h | 2 +- .../http/rate_limit_quota/client_impl.cc | 36 +++++++----- .../http/rate_limit_quota/client_impl.h | 2 +- .../filters/http/rate_limit_quota/filter.cc | 2 +- .../http/rate_limit_quota/client_test.cc | 33 ++++++++++- .../http/rate_limit_quota/client_test_utils.h | 12 ++-- .../http/rate_limit_quota/integration_test.cc | 55 +++++++++++-------- .../filters/http/rate_limit_quota/mocks.h | 2 +- 8 files changed, 95 insertions(+), 49 deletions(-) diff --git a/source/extensions/filters/http/rate_limit_quota/client.h b/source/extensions/filters/http/rate_limit_quota/client.h index a8d31c39b06e..198b919d5bd2 100644 --- a/source/extensions/filters/http/rate_limit_quota/client.h +++ b/source/extensions/filters/http/rate_limit_quota/client.h @@ -37,7 +37,7 @@ class RateLimitClient { public: virtual ~RateLimitClient() = default; - virtual absl::Status startStream(const StreamInfo::StreamInfo& stream_info) PURE; + virtual absl::Status startStream(const StreamInfo::StreamInfo* stream_info) PURE; virtual void closeStream() PURE; virtual void sendUsageReport(absl::optional bucket_id) PURE; diff --git a/source/extensions/filters/http/rate_limit_quota/client_impl.cc b/source/extensions/filters/http/rate_limit_quota/client_impl.cc index 7d886a49e7b3..876060592eb3 100644 --- a/source/extensions/filters/http/rate_limit_quota/client_impl.cc +++ b/source/extensions/filters/http/rate_limit_quota/client_impl.cc @@ -64,14 +64,17 @@ RateLimitQuotaUsageReports RateLimitClientImpl::buildReport(absl::optional bucket_id) { - if (stream_ != nullptr) { - // Build the report and then send the report to RLQS server. - // `end_stream` should always be set to false as we don't want to close the stream locally. - stream_->sendMessage(buildReport(bucket_id), /*end_stream=*/false); - } else { - // Don't send any reports if stream has already been closed. - ENVOY_LOG(debug, "The stream has already been closed; no reports will be sent."); + if (stream_ == nullptr) { + ENVOY_LOG(debug, "The RLQS stream has been closed and must be restarted to send reports."); + if (absl::Status err = startStream(nullptr); !err.ok()) { + ENVOY_LOG(error, "Failed to start the stream to send reports."); + return; + } } + + // Build the report and then send the report to RLQS server. + // `end_stream` should always be set to false as we don't want to close the stream locally. + stream_->sendMessage(buildReport(bucket_id), /*end_stream=*/false); } void RateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response) { @@ -165,20 +168,27 @@ void RateLimitClientImpl::onRemoteClose(Grpc::Status::GrpcStatus status, stream_ = nullptr; } -absl::Status RateLimitClientImpl::startStream(const StreamInfo::StreamInfo& stream_info) { +absl::Status RateLimitClientImpl::startStream(const StreamInfo::StreamInfo* stream_info) { // Starts stream if it has not been opened yet. if (stream_ == nullptr) { ENVOY_LOG(debug, "Trying to start the new gRPC stream"); + auto stream_options = Http::AsyncClient::RequestOptions(); + if (stream_info) { + stream_options.setParentContext(Http::AsyncClient::ParentContext{stream_info}); + } stream_ = aync_client_.start( *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.rate_limit_quota.v3.RateLimitQuotaService.StreamRateLimitQuotas"), - *this, - Http::AsyncClient::RequestOptions().setParentContext( - Http::AsyncClient::ParentContext{&stream_info})); + *this, stream_options); + } + + // If still null after attempting a start. + if (stream_ == nullptr) { + return absl::InternalError("Failed to start the stream"); } - // Returns error status if start failed (i.e., stream_ is nullptr). - return stream_ == nullptr ? absl::InternalError("Failed to start the stream") : absl::OkStatus(); + ENVOY_LOG(debug, "gRPC stream has been started"); + return absl::OkStatus(); } } // namespace RateLimitQuota diff --git a/source/extensions/filters/http/rate_limit_quota/client_impl.h b/source/extensions/filters/http/rate_limit_quota/client_impl.h index b471755d2420..c4584c26b8e2 100644 --- a/source/extensions/filters/http/rate_limit_quota/client_impl.h +++ b/source/extensions/filters/http/rate_limit_quota/client_impl.h @@ -45,7 +45,7 @@ class RateLimitClientImpl : public RateLimitClient, void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override; // RateLimitClient methods. - absl::Status startStream(const StreamInfo::StreamInfo& stream_info) override; + absl::Status startStream(const StreamInfo::StreamInfo* stream_info) override; void closeStream() override; // Send the usage report to RLQS server void sendUsageReport(absl::optional bucket_id) override; diff --git a/source/extensions/filters/http/rate_limit_quota/filter.cc b/source/extensions/filters/http/rate_limit_quota/filter.cc index 65535753c3e3..e5671033ccce 100644 --- a/source/extensions/filters/http/rate_limit_quota/filter.cc +++ b/source/extensions/filters/http/rate_limit_quota/filter.cc @@ -185,7 +185,7 @@ RateLimitQuotaFilter::sendImmediateReport(const size_t bucket_id, // Start the streaming on the first request. // It will be a no-op if the stream is already active. - auto status = client_.rate_limit_client->startStream(callbacks_->streamInfo()); + auto status = client_.rate_limit_client->startStream(&callbacks_->streamInfo()); if (!status.ok()) { ENVOY_LOG(error, "Failed to start the gRPC stream: ", status.message()); // TODO(tyxia) Check `NoAssignmentBehavior` behavior instead of fail-open here. diff --git a/test/extensions/filters/http/rate_limit_quota/client_test.cc b/test/extensions/filters/http/rate_limit_quota/client_test.cc index 1fee6b6f4683..336b47b7fff5 100644 --- a/test/extensions/filters/http/rate_limit_quota/client_test.cc +++ b/test/extensions/filters/http/rate_limit_quota/client_test.cc @@ -18,7 +18,7 @@ class RateLimitClientTest : public testing::Test { }; TEST_F(RateLimitClientTest, OpenAndCloseStream) { - EXPECT_OK(test_client.client_->startStream(test_client.stream_info_)); + EXPECT_OK(test_client.client_->startStream(&test_client.stream_info_)); EXPECT_CALL(test_client.stream_, closeStream()); EXPECT_CALL(test_client.stream_, resetStream()); test_client.client_->closeStream(); @@ -27,7 +27,7 @@ TEST_F(RateLimitClientTest, OpenAndCloseStream) { TEST_F(RateLimitClientTest, SendUsageReport) { ::envoy::service::rate_limit_quota::v3::BucketId bucket_id; TestUtility::loadFromYaml(SingleBukcetId, bucket_id); - EXPECT_OK(test_client.client_->startStream(test_client.stream_info_)); + EXPECT_OK(test_client.client_->startStream(&test_client.stream_info_)); bool end_stream = false; // Send quota usage report and ensure that we get it. EXPECT_CALL(test_client.stream_, sendMessageRaw_(_, end_stream)); @@ -39,7 +39,7 @@ TEST_F(RateLimitClientTest, SendUsageReport) { } TEST_F(RateLimitClientTest, SendRequestAndReceiveResponse) { - EXPECT_OK(test_client.client_->startStream(test_client.stream_info_)); + EXPECT_OK(test_client.client_->startStream(&test_client.stream_info_)); ASSERT_NE(test_client.stream_callbacks_, nullptr); auto empty_request_headers = Http::RequestHeaderMapImpl::create(); @@ -66,6 +66,33 @@ TEST_F(RateLimitClientTest, SendRequestAndReceiveResponse) { test_client.client_->onRemoteClose(0, ""); } +TEST_F(RateLimitClientTest, RestartStreamWhileInUse) { + ::envoy::service::rate_limit_quota::v3::BucketId bucket_id; + TestUtility::loadFromYaml(SingleBukcetId, bucket_id); + EXPECT_OK(test_client.client_->startStream(&test_client.stream_info_)); + + bool end_stream = false; + // Send quota usage report and ensure that we get it. + EXPECT_CALL(test_client.stream_, sendMessageRaw_(_, end_stream)); + const size_t bucket_id_hash = MessageUtil::hash(bucket_id); + test_client.client_->sendUsageReport(bucket_id_hash); + EXPECT_CALL(test_client.stream_, closeStream()); + EXPECT_CALL(test_client.stream_, resetStream()); + test_client.client_->closeStream(); + + // Expect the stream to reopen while trying to send the next usage report. + EXPECT_CALL(test_client.stream_, sendMessageRaw_(_, end_stream)); + test_client.client_->sendUsageReport(bucket_id_hash); + EXPECT_CALL(test_client.stream_, closeStream()); + EXPECT_CALL(test_client.stream_, resetStream()); + test_client.client_->closeStream(); + + // Expect the client to handle a restart failure. + EXPECT_CALL(*test_client.async_client_, startRaw(_, _, _, _)).WillOnce(testing::Return(nullptr)); + WAIT_FOR_LOG_CONTAINS("error", "Failed to start the stream to send reports.", + { test_client.client_->sendUsageReport(bucket_id_hash); }); +} + } // namespace } // namespace RateLimitQuota } // namespace HttpFilters diff --git a/test/extensions/filters/http/rate_limit_quota/client_test_utils.h b/test/extensions/filters/http/rate_limit_quota/client_test_utils.h index 2f19f9b48b22..e6624af9b406 100644 --- a/test/extensions/filters/http/rate_limit_quota/client_test_utils.h +++ b/test/extensions/filters/http/rate_limit_quota/client_test_utils.h @@ -70,12 +70,12 @@ class RateLimitTestClient { } Grpc::RawAsyncClientSharedPtr mockCreateAsyncClient(Unused, Unused, Unused) { - auto async_client = std::make_shared(); - EXPECT_CALL(*async_client, startRaw("envoy.service.rate_limit_quota.v3.RateLimitQuotaService", - "StreamRateLimitQuotas", _, _)) - .WillOnce(Invoke(this, &RateLimitTestClient::mockStartRaw)); + async_client_ = std::make_shared(); + EXPECT_CALL(*async_client_, startRaw("envoy.service.rate_limit_quota.v3.RateLimitQuotaService", + "StreamRateLimitQuotas", _, _)) + .WillRepeatedly(Invoke(this, &RateLimitTestClient::mockStartRaw)); - return async_client; + return async_client_; } Grpc::RawAsyncStream* mockStartRaw(Unused, Unused, Grpc::RawAsyncStreamCallbacks& callbacks, @@ -97,7 +97,7 @@ class RateLimitTestClient { Grpc::RawAsyncStreamCallbacks* stream_callbacks_; Grpc::Status::GrpcStatus grpc_status_ = Grpc::Status::WellKnownGrpcStatus::Ok; RateLimitClientPtr client_; - // std::unique_ptr client_; + std::shared_ptr async_client_ = nullptr; MockRateLimitQuotaCallbacks callbacks_; bool external_inited_ = false; bool start_failed_ = false; diff --git a/test/extensions/filters/http/rate_limit_quota/integration_test.cc b/test/extensions/filters/http/rate_limit_quota/integration_test.cc index dc37a9720bc4..37d4a97df6a2 100644 --- a/test/extensions/filters/http/rate_limit_quota/integration_test.cc +++ b/test/extensions/filters/http/rate_limit_quota/integration_test.cc @@ -787,42 +787,51 @@ TEST_P(RateLimitQuotaIntegrationTest, BasicFlowPeriodicalReportWithStreamClosed) EXPECT_TRUE(response_->complete()); EXPECT_EQ(response_->headers().getStatusValue(), "200"); + // ValidMatcherConfig. + int report_interval_sec = 60; // Trigger the report periodically. for (int i = 0; i < 6; ++i) { if (i == 2) { // Close the stream. - rlqs_stream_->finishGrpcStream(Grpc::Status::Ok); + WAIT_FOR_LOG_CONTAINS("debug", "gRPC stream closed remotely with status", + { rlqs_stream_->finishGrpcStream(Grpc::Status::Canceled); }); + ASSERT_TRUE(rlqs_stream_->waitForReset()); } // Advance the time by report_interval. simTime().advanceTimeWait(std::chrono::milliseconds(report_interval_sec * 1000)); - // Only perform rlqs server check and response before stream is remotely closed. - if (i < 2) { - // Checks that the rate limit server has received the periodical reports. - ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports)); + if (i == 2) { + // Stream should be restarted when next required for usage reporting. + ASSERT_TRUE(rlqs_connection_->waitForNewStream(*dispatcher_, rlqs_stream_)); + rlqs_stream_->startGrpcStream(); + } - // Verify the usage report content. - ASSERT_THAT(reports.bucket_quota_usages_size(), 1); - const auto& usage = reports.bucket_quota_usages(0); - // Report only represents the usage since last report. - // In the periodical report case here, the number of request allowed and denied is 0 since no - // new requests comes in. - EXPECT_EQ(usage.num_requests_allowed(), 0); - EXPECT_EQ(usage.num_requests_denied(), 0); - // time_elapsed equals to periodical reporting interval. - EXPECT_EQ(Protobuf::util::TimeUtil::DurationToSeconds(usage.time_elapsed()), - report_interval_sec); + // Only perform rlqs server check and response before stream is remotely + // closed. Checks that the rate limit server has received the periodical + // reports. + ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports)); + + // Verify the usage report content. + ASSERT_THAT(reports.bucket_quota_usages_size(), 1); + const auto& usage = reports.bucket_quota_usages(0); + // Report only represents the usage since last report. + // In the periodical report case here, the number of request allowed and + // denied is 0 since no new requests comes in. + EXPECT_EQ(usage.num_requests_allowed(), 0); + EXPECT_EQ(usage.num_requests_denied(), 0); + // time_elapsed equals to periodical reporting interval. + EXPECT_EQ(Protobuf::util::TimeUtil::DurationToSeconds(usage.time_elapsed()), + report_interval_sec); - // Build the rlqs server response. - envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response2; - auto* bucket_action2 = rlqs_response2.add_bucket_action(); + // Build the rlqs server response. + envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response2; + auto* bucket_action2 = rlqs_response2.add_bucket_action(); - for (const auto& [key, value] : custom_headers_cpy) { - (*bucket_action2->mutable_bucket_id()->mutable_bucket()).insert({key, value}); - } - rlqs_stream_->sendGrpcMessage(rlqs_response2); + for (const auto& [key, value] : custom_headers_cpy) { + (*bucket_action2->mutable_bucket_id()->mutable_bucket()).insert({key, value}); } + rlqs_stream_->sendGrpcMessage(rlqs_response2); } } diff --git a/test/extensions/filters/http/rate_limit_quota/mocks.h b/test/extensions/filters/http/rate_limit_quota/mocks.h index 01d374ec6ceb..aedd75e3e971 100644 --- a/test/extensions/filters/http/rate_limit_quota/mocks.h +++ b/test/extensions/filters/http/rate_limit_quota/mocks.h @@ -28,7 +28,7 @@ class MockRateLimitClient : public RateLimitClient { MockRateLimitClient() = default; ~MockRateLimitClient() override = default; - MOCK_METHOD(absl::Status, startStream, (const StreamInfo::StreamInfo&)); + MOCK_METHOD(absl::Status, startStream, (const StreamInfo::StreamInfo*)); MOCK_METHOD(void, closeStream, ()); MOCK_METHOD(void, sendUsageReport, (absl::optional));