diff --git a/source/extensions/filters/http/rate_limit_quota/client_impl.cc b/source/extensions/filters/http/rate_limit_quota/client_impl.cc index 85fcab8725de..92390ea8a302 100644 --- a/source/extensions/filters/http/rate_limit_quota/client_impl.cc +++ b/source/extensions/filters/http/rate_limit_quota/client_impl.cc @@ -85,7 +85,7 @@ void RateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response) // Get the hash id value from BucketId in the response. const size_t bucket_id = MessageUtil::hash(action.bucket_id()); - ENVOY_LOG(trace, + ENVOY_LOG(debug, "Received a response for bucket id proto :\n {}, and generated " "the associated hashed bucket id: {}", action.bucket_id().DebugString(), bucket_id); @@ -97,10 +97,11 @@ void RateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response) switch (action.bucket_action_case()) { case envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse_BucketAction:: kQuotaAssignmentAction: { - quota_buckets_[bucket_id]->bucket_action = action; - if (quota_buckets_[bucket_id]->bucket_action.has_quota_assignment_action()) { + quota_buckets_[bucket_id]->cached_action = action; + quota_buckets_[bucket_id]->current_assignment_time = time_source_.monotonicTime(); + if (quota_buckets_[bucket_id]->cached_action->has_quota_assignment_action()) { auto rate_limit_strategy = quota_buckets_[bucket_id] - ->bucket_action.quota_assignment_action() + ->cached_action->quota_assignment_action() .rate_limit_strategy(); if (rate_limit_strategy.has_token_bucket()) { @@ -127,6 +128,7 @@ void RateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response) case envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse_BucketAction:: kAbandonAction: { quota_buckets_.erase(bucket_id); + ENVOY_LOG(debug, "Bucket id {} removed from the cache by abandon action.", bucket_id); break; } default: { @@ -136,6 +138,7 @@ void RateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response) } } } + ENVOY_LOG(debug, "Assignment cached for bucket id {}.", bucket_id); } // `rlqs_callback_` has been reset to nullptr for periodical report case. diff --git a/source/extensions/filters/http/rate_limit_quota/filter.cc b/source/extensions/filters/http/rate_limit_quota/filter.cc index 7cbb29fcdb4f..65535753c3e3 100644 --- a/source/extensions/filters/http/rate_limit_quota/filter.cc +++ b/source/extensions/filters/http/rate_limit_quota/filter.cc @@ -9,6 +9,8 @@ namespace Extensions { namespace HttpFilters { namespace RateLimitQuota { +using envoy::type::v3::RateLimitStrategy; + const char kBucketMetadataNamespace[] = "envoy.extensions.http_filters.rate_limit_quota.bucket"; Http::FilterHeadersStatus RateLimitQuotaFilter::decodeHeaders(Http::RequestHeaderMap& headers, @@ -23,7 +25,7 @@ Http::FilterHeadersStatus RateLimitQuotaFilter::decodeHeaders(Http::RequestHeade // allowed/denied, matching succeed/fail and so on. ENVOY_LOG(debug, "The request is not matched by any matchers: ", match_result.status().message()); - return Envoy::Http::FilterHeadersStatus::Continue; + return sendAllowResponse(); } // Second, generate the bucket id for this request based on match action when the request matching @@ -36,7 +38,7 @@ Http::FilterHeadersStatus RateLimitQuotaFilter::decodeHeaders(Http::RequestHeade // When it failed to generate the bucket id for this specific request, the request is ALLOWED by // default (i.e., fail-open). ENVOY_LOG(debug, "Unable to generate the bucket id: {}", ret.status().message()); - return Envoy::Http::FilterHeadersStatus::Continue; + return sendAllowResponse(); } const BucketId& bucket_id_proto = *ret; @@ -123,34 +125,36 @@ void RateLimitQuotaFilter::createNewBucket(const BucketId& bucket_id, // Create new bucket and store it into quota cache. std::unique_ptr new_bucket = std::make_unique(); - // The first matched request doesn't have quota assignment from the RLQS server yet, so the - // action is performed based on pre-configured strategy from no assignment behavior config. + // The first matched request doesn't have quota assignment from the RLQS + // server yet, so the action is performed based on pre-configured strategy + // from no assignment behavior config. auto mutable_rate_limit_strategy = - new_bucket->bucket_action.mutable_quota_assignment_action()->mutable_rate_limit_strategy(); + new_bucket->default_action.mutable_quota_assignment_action()->mutable_rate_limit_strategy(); if (match_action.bucketSettings().has_no_assignment_behavior()) { *mutable_rate_limit_strategy = match_action.bucketSettings().no_assignment_behavior().fallback_rate_limit(); } else { - // When `no_assignment_behavior` is not configured, default blanket rule is set to ALLOW_ALL. - // (i.e., fail-open). - mutable_rate_limit_strategy->set_blanket_rule(envoy::type::v3::RateLimitStrategy::ALLOW_ALL); + // When `no_assignment_behavior` is not configured, default blanket rule is + // set to ALLOW_ALL. (i.e., fail-open). + mutable_rate_limit_strategy->set_blanket_rule(RateLimitStrategy::ALLOW_ALL); } // Set up the bucket id. new_bucket->bucket_id = bucket_id; - // Set up the first time assignment time. - new_bucket->first_assignment_time = time_source_.monotonicTime(); + // Mark the assignment time. + auto now = time_source_.monotonicTime(); + new_bucket->current_assignment_time = now; // Set up the quota usage. QuotaUsage quota_usage; - quota_usage.last_report = std::chrono::duration_cast( - time_source_.monotonicTime().time_since_epoch()); + quota_usage.last_report = + std::chrono::duration_cast(now.time_since_epoch()); switch (mutable_rate_limit_strategy->blanket_rule()) { PANIC_ON_PROTO_ENUM_SENTINEL_VALUES; - case envoy::type::v3::RateLimitStrategy::ALLOW_ALL: + case RateLimitStrategy::ALLOW_ALL: quota_usage.num_requests_allowed++; break; - case envoy::type::v3::RateLimitStrategy::DENY_ALL: + case RateLimitStrategy::DENY_ALL: quota_usage.num_requests_denied++; break; } @@ -159,6 +163,8 @@ void RateLimitQuotaFilter::createNewBucket(const BucketId& bucket_id, quota_buckets_[id] = std::move(new_bucket); } +// This function should not update QuotaUsage as that will have been handled +// when constructing the Report before this function is called. Http::FilterHeadersStatus RateLimitQuotaFilter::sendImmediateReport(const size_t bucket_id, const RateLimitOnMatchAction& match_action) { @@ -183,10 +189,9 @@ RateLimitQuotaFilter::sendImmediateReport(const size_t bucket_id, if (!status.ok()) { ENVOY_LOG(error, "Failed to start the gRPC stream: ", status.message()); // TODO(tyxia) Check `NoAssignmentBehavior` behavior instead of fail-open here. - return Envoy::Http::FilterHeadersStatus::Continue; - } else { - ENVOY_LOG(debug, "The gRPC stream is established and active"); + return sendAllowResponse(); } + ENVOY_LOG(debug, "The gRPC stream is established and active"); // Send the usage report to RLQS server immediately on the first time when the request is // matched. @@ -202,103 +207,172 @@ RateLimitQuotaFilter::sendImmediateReport(const size_t bucket_id, // bucket_matchers for the first time) should be already set based on no assignment behavior in // `createNewBucket` when the bucket is initially created. ASSERT(quota_buckets_.find(bucket_id) != quota_buckets_.end()); - if (quota_buckets_[bucket_id] - ->bucket_action.quota_assignment_action() - .rate_limit_strategy() - .blanket_rule() == envoy::type::v3::RateLimitStrategy::ALLOW_ALL) { - ENVOY_LOG( - trace, - "For first matched request with hashed bucket_id {}, it is allowed by ALLOW_ALL strategy.", - bucket_id); - return Http::FilterHeadersStatus::Continue; - } else { - // For the request that is rejected due to DENY_ALL no_assignment_behavior, immediate report is - // still sent to RLQS server above, and here the local reply with deny response is sent. - ENVOY_LOG( - trace, - "For first matched request with hashed bucket_id {}, it is throttled by DENY_ALL strategy.", - bucket_id); - sendDenyResponse(); - return Envoy::Http::FilterHeadersStatus::StopIteration; + // If not given a default blanket rule, the first matched request is allowed. + if (!quota_buckets_[bucket_id]->default_action.has_quota_assignment_action() || + !quota_buckets_[bucket_id] + ->default_action.quota_assignment_action() + .has_rate_limit_strategy() || + !quota_buckets_[bucket_id] + ->default_action.quota_assignment_action() + .rate_limit_strategy() + .has_blanket_rule()) { + ENVOY_LOG(trace, "Without a default blanket rule configured, the first matched " + "request with hashed bucket_id {} is allowed through."); + ENVOY_LOG(debug, "Default action for bucket_id {} does not contain a blanket action: {}", + bucket_id, quota_buckets_[bucket_id]->default_action.DebugString()); + return sendAllowResponse(); } + auto blanket_rule = quota_buckets_[bucket_id] + ->default_action.quota_assignment_action() + .rate_limit_strategy() + .blanket_rule(); + if (blanket_rule == RateLimitStrategy::DENY_ALL) { + // For the request that is rejected due to DENY_ALL + // no_assignment_behavior, immediate report is still sent to RLQS server + // above, and here the local reply with deny response is sent. + ENVOY_LOG(trace, + "For first matched request with hashed bucket_id {}, it is " + "throttled by DENY_ALL strategy.", + bucket_id); + ENVOY_LOG(debug, "Hit configured default DENY_ALL for bucket_id {}", bucket_id); + return sendDenyResponse(); + } + + ENVOY_LOG(trace, + "For first matched request with hashed bucket_id {}, it is " + "allowed by the configured default ALLOW_ALL strategy.", + bucket_id); + ENVOY_LOG(debug, "Hit configured default ALLOW_ALL for bucket_id {}", bucket_id); + return sendAllowResponse(); } Http::FilterHeadersStatus -RateLimitQuotaFilter::processCachedBucket(size_t bucket_id, - const RateLimitOnMatchAction& match_action) { +RateLimitQuotaFilter::setUsageAndResponseFromAction(const BucketAction& action, + const size_t bucket_id) { + if (!action.has_quota_assignment_action() || + !action.quota_assignment_action().has_rate_limit_strategy()) { + ENVOY_LOG(debug, + "Selected bucket action defaulting to ALLOW_ALL as it does not " + "have an assignment for bucket_id {}", + bucket_id); + return sendAllowResponse("a_buckets_[bucket_id]->quota_usage); + } + + // TODO(tyxia) Currently, blanket rule and token bucket strategies are + // implemented. Change to switch case when `RequestsPerTimeUnit` strategy is + // implemented. + auto rate_limit_strategy = action.quota_assignment_action().rate_limit_strategy(); + if (rate_limit_strategy.has_blanket_rule()) { + bool allow = (rate_limit_strategy.blanket_rule() != RateLimitStrategy::DENY_ALL); + ENVOY_LOG(trace, "Request with hashed bucket_id {} is {} by the selected blanket rule.", + bucket_id, allow ? "allowed" : "denied"); + if (allow) { + return sendAllowResponse("a_buckets_[bucket_id]->quota_usage); + } + return sendDenyResponse("a_buckets_[bucket_id]->quota_usage); + } + + if (rate_limit_strategy.has_token_bucket()) { + auto token_bucket = quota_buckets_[bucket_id]->token_bucket_limiter.get(); + ASSERT(token_bucket); + + // Try to consume 1 token from the bucket. + if (token_bucket->consume(1, /*allow_partial=*/false)) { + // Request is allowed. + ENVOY_LOG(trace, + "Request with hashed bucket_id {} is allowed by token bucket " + "limiter.", + bucket_id); + ENVOY_LOG(debug, + "Allowing request as token bucket is not empty for bucket_id " + "{}. Initial assignment: {}.", + bucket_id, rate_limit_strategy.token_bucket().ShortDebugString()); + return sendAllowResponse("a_buckets_[bucket_id]->quota_usage); + } + // Request is throttled. + ENVOY_LOG(trace, + "Request with hashed bucket_id {} is throttled by token " + "bucket limiter", + bucket_id); + ENVOY_LOG(debug, + "Denying request as token bucket is exhausted for bucket_id {}. " + "Initial assignment: {}.", + bucket_id, rate_limit_strategy.token_bucket().ShortDebugString()); + return sendDenyResponse("a_buckets_[bucket_id]->quota_usage); + } + + ENVOY_LOG(error, + "Failing open as selected bucket action for bucket_id {} contains " + "an unsupported rate limit strategy: {}", + bucket_id, rate_limit_strategy.DebugString()); + return sendAllowResponse("a_buckets_[bucket_id]->quota_usage); +} + +bool isCachedActionExpired(TimeSource& time_source, const Bucket& bucket) { // First, check if assignment has expired nor not. auto now = std::chrono::duration_cast( - time_source_.monotonicTime().time_since_epoch()); + time_source.monotonicTime().time_since_epoch()); auto assignment_time_elapsed = Protobuf::util::TimeUtil::NanosecondsToDuration( (now - std::chrono::duration_cast( - quota_buckets_[bucket_id]->first_assignment_time.time_since_epoch())) + bucket.current_assignment_time.time_since_epoch())) .count()); - if (assignment_time_elapsed > quota_buckets_[bucket_id] - ->bucket_action.quota_assignment_action() - .assignment_time_to_live()) { - // If expired, remove the cache entry. - quota_buckets_.erase(bucket_id); - - // Default strategy is fail-Open (i.e., allow_all). - auto ret_status = Envoy::Http::FilterHeadersStatus::Continue; - // Check the expired assignment behavior if configured. - // Note, only fail-open and fail-close are supported, more advanced expired assignment can be - // supported as needed. - if (match_action.bucketSettings().has_expired_assignment_behavior()) { - if (match_action.bucketSettings() - .expired_assignment_behavior() - .fallback_rate_limit() - .blanket_rule() == envoy::type::v3::RateLimitStrategy::DENY_ALL) { - sendDenyResponse(); - ret_status = Envoy::Http::FilterHeadersStatus::StopIteration; - } - } + return (assignment_time_elapsed > + bucket.cached_action->quota_assignment_action().assignment_time_to_live()); +} + +Http::FilterHeadersStatus +RateLimitQuotaFilter::processCachedBucket(size_t bucket_id, + const RateLimitOnMatchAction& match_action) { + auto* cached_bucket = quota_buckets_[bucket_id].get(); + + // If no cached action, use the default action. + if (!cached_bucket->cached_action.has_value()) { + return setUsageAndResponseFromAction(cached_bucket->default_action, bucket_id); + } + + // If expired, remove the expired action & fallback. + if (isCachedActionExpired(time_source_, *cached_bucket)) { + Http::FilterHeadersStatus ret_status = processExpiredBucket(bucket_id, match_action); + cached_bucket->cached_action = std::nullopt; return ret_status; } - // Second, get the quota assignment (if exists) from the cached bucket action. - if (quota_buckets_[bucket_id]->bucket_action.has_quota_assignment_action()) { - auto rate_limit_strategy = - quota_buckets_[bucket_id]->bucket_action.quota_assignment_action().rate_limit_strategy(); - - // TODO(tyxia) Currently, blanket rule and token bucket strategies are implemented. - // Change to switch case when `RequestsPerTimeUnit` strategy is implemented. - if (rate_limit_strategy.has_blanket_rule()) { - if (rate_limit_strategy.blanket_rule() == envoy::type::v3::RateLimitStrategy::ALLOW_ALL) { - quota_buckets_[bucket_id]->quota_usage.num_requests_allowed += 1; - ENVOY_LOG(trace, - "Request with hashed bucket_id {} is allowed by cached ALLOW_ALL strategy.", - bucket_id); - } else if (rate_limit_strategy.blanket_rule() == - envoy::type::v3::RateLimitStrategy::DENY_ALL) { - quota_buckets_[bucket_id]->quota_usage.num_requests_denied += 1; - ENVOY_LOG(trace, - "Request with hashed bucket_id {} is throttled by cached DENY_ALL strategy.", - bucket_id); - sendDenyResponse(); - return Envoy::Http::FilterHeadersStatus::StopIteration; - } - } else if (rate_limit_strategy.has_token_bucket()) { - ASSERT(quota_buckets_[bucket_id]->token_bucket_limiter != nullptr); - TokenBucket* limiter = quota_buckets_[bucket_id]->token_bucket_limiter.get(); - // Try to consume 1 token from the bucket. - if (limiter->consume(1, /*allow_partial=*/false)) { - // Request is allowed. - quota_buckets_[bucket_id]->quota_usage.num_requests_allowed += 1; - ENVOY_LOG(trace, "Request with hashed bucket_id {} is allowed by token bucket limiter.", - bucket_id); - } else { - // Request is throttled. - quota_buckets_[bucket_id]->quota_usage.num_requests_denied += 1; - ENVOY_LOG(trace, "Request with hashed bucket_id {} is throttled by token bucket limiter", - bucket_id); - sendDenyResponse(); - return Envoy::Http::FilterHeadersStatus::StopIteration; - } - } + // If not expired, use the cached action. + return setUsageAndResponseFromAction(*cached_bucket->cached_action, bucket_id); +} + +// Note: does not remove the expired entity from the cache. +Http::FilterHeadersStatus +RateLimitQuotaFilter::processExpiredBucket(size_t bucket_id, + const RateLimitOnMatchAction& match_action) { + auto* cached_bucket = quota_buckets_[bucket_id].get(); + + if (!match_action.bucketSettings().has_expired_assignment_behavior() || + !match_action.bucketSettings().expired_assignment_behavior().has_fallback_rate_limit()) { + ENVOY_LOG(debug, + "Selecting default action for bucket_id as expiration " + "fallback assignment doesn't have a configured override {}", + match_action.bucketSettings().expired_assignment_behavior().DebugString()); + return setUsageAndResponseFromAction(cached_bucket->default_action, bucket_id); } - return Envoy::Http::FilterHeadersStatus::Continue; + + const RateLimitStrategy& fallback_rate_limit = + match_action.bucketSettings().expired_assignment_behavior().fallback_rate_limit(); + if (fallback_rate_limit.has_blanket_rule() && + fallback_rate_limit.blanket_rule() == RateLimitStrategy::DENY_ALL) { + ENVOY_LOG(debug, + "Exipred action falling back to configured DENY_ALL for " + "bucket_id {}", + bucket_id); + return sendDenyResponse(&cached_bucket->quota_usage); + } + + ENVOY_LOG(debug, + "Exipred action falling back to ALLOW_ALL for bucket_id {} with " + "fallback action {}", + bucket_id, fallback_rate_limit.DebugString()); + return sendAllowResponse(&cached_bucket->quota_usage); } } // namespace RateLimitQuota diff --git a/source/extensions/filters/http/rate_limit_quota/filter.h b/source/extensions/filters/http/rate_limit_quota/filter.h index e1302214affb..d6259999bf5b 100644 --- a/source/extensions/filters/http/rate_limit_quota/filter.h +++ b/source/extensions/filters/http/rate_limit_quota/filter.h @@ -93,14 +93,34 @@ class RateLimitQuotaFilter : public Http::PassThroughFilter, Http::FilterHeadersStatus sendImmediateReport(const size_t bucket_id, const RateLimitOnMatchAction& match_action); + Http::FilterHeadersStatus setUsageAndResponseFromAction(const BucketAction& action, + size_t bucket_id); + Http::FilterHeadersStatus processCachedBucket(size_t bucket_id, const RateLimitOnMatchAction& match_action); // TODO(tyxia) Build the customized response based on `DenyResponseSettings`. - void sendDenyResponse() { + // Send a deny response and update quota usage if provided. + Http::FilterHeadersStatus sendDenyResponse(QuotaUsage* quota_usage = nullptr) { callbacks_->sendLocalReply(Envoy::Http::Code::TooManyRequests, "", nullptr, absl::nullopt, ""); callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::RateLimited); + if (quota_usage) + quota_usage->num_requests_denied++; + return Http::FilterHeadersStatus::StopIteration; + } + + // Send an allow response and update quota usage if provided. + Http::FilterHeadersStatus sendAllowResponse(QuotaUsage* quota_usage = nullptr) { + if (quota_usage) + quota_usage->num_requests_allowed++; + return Http::FilterHeadersStatus::Continue; } + // Get the FilterHeadersStatus to return when a selected bucket has an expired + // assignment. Note: this does not actually remove the expired entity from the + // cache. + Http::FilterHeadersStatus processExpiredBucket(size_t bucket_id, + const RateLimitOnMatchAction& match_action); + FilterConfigConstSharedPtr config_; Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_; Server::Configuration::FactoryContext& factory_context_; diff --git a/source/extensions/filters/http/rate_limit_quota/quota_bucket_cache.h b/source/extensions/filters/http/rate_limit_quota/quota_bucket_cache.h index e2923b9156d2..1896a1e24983 100644 --- a/source/extensions/filters/http/rate_limit_quota/quota_bucket_cache.h +++ b/source/extensions/filters/http/rate_limit_quota/quota_bucket_cache.h @@ -41,13 +41,17 @@ struct Bucket { // RLQS server. BucketId bucket_id; // Cached action from the response that was received from the RLQS server. - BucketAction bucket_action; + absl::optional cached_action = absl::nullopt; + // Default action defined by the bucket's no_assignment_behavior setting. Used + // when the bucket is waiting for an assigned action from the RLQS server + // (e.g. during initial bucket hits & after stale assignments expire). + BucketAction default_action; // Cache quota usage. QuotaUsage quota_usage; // Rate limiter based on token bucket algorithm. TokenBucketPtr token_bucket_limiter; - // First assignment time. - Envoy::MonotonicTime first_assignment_time = {}; + // Most recent assignment time. + Envoy::MonotonicTime current_assignment_time; }; using BucketsCache = absl::flat_hash_map>; diff --git a/test/extensions/filters/http/rate_limit_quota/integration_test.cc b/test/extensions/filters/http/rate_limit_quota/integration_test.cc index ec4e45a89113..dc37a9720bc4 100644 --- a/test/extensions/filters/http/rate_limit_quota/integration_test.cc +++ b/test/extensions/filters/http/rate_limit_quota/integration_test.cc @@ -28,6 +28,7 @@ enum class BlanketRule { struct ConfigOption { bool valid_rlqs_server = true; BlanketRule no_assignment_blanket_rule = BlanketRule::NOT_SPECIFIED; + bool unsupported_no_assignment_strategy = false; BlanketRule expired_assignment_blanket_rule = BlanketRule::NOT_SPECIFIED; }; @@ -110,6 +111,12 @@ class RateLimitQuotaIntegrationTest ->mutable_fallback_rate_limit() ->set_blanket_rule(envoy::type::v3::RateLimitStrategy::DENY_ALL); } + } else if (config_option.unsupported_no_assignment_strategy) { + auto* requests_per_time_unit = mutable_bucket_settings.mutable_no_assignment_behavior() + ->mutable_fallback_rate_limit() + ->mutable_requests_per_time_unit(); + requests_per_time_unit->set_requests_per_time_unit(100); + requests_per_time_unit->set_time_unit(envoy::type::v3::RateLimitUnit::SECOND); } if (config_option.expired_assignment_blanket_rule != BlanketRule::NOT_SPECIFIED) { @@ -122,6 +129,9 @@ class RateLimitQuotaIntegrationTest ->mutable_fallback_rate_limit() ->set_blanket_rule(envoy::type::v3::RateLimitStrategy::DENY_ALL); } + mutable_bucket_settings.mutable_expired_assignment_behavior() + ->mutable_expired_assignment_behavior_timeout() + ->set_seconds(15); } mutable_config->PackFrom(mutable_bucket_settings); @@ -169,12 +179,35 @@ class RateLimitQuotaIntegrationTest void TearDown() override { cleanUp(); } + bool expectAllowedRequest() { + if (!fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)) + return false; + if (!fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)) + return false; + if (!upstream_request_->waitForEndStream(*dispatcher_)) + return false; + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, true); + + // Verify the response to downstream. + if (!response_->waitForEndStream()) + return false; + EXPECT_TRUE(response_->complete()); + EXPECT_EQ(response_->headers().getStatusValue(), "200"); + + // Clean up the upstream and downstream resource but keep the gRPC + // connection to RLQS server open. + cleanupUpstreamAndDownstream(); + return true; + } + envoy::extensions::filters::http::rate_limit_quota::v3::RateLimitQuotaFilterConfig proto_config_{}; std::vector grpc_upstreams_; FakeHttpConnectionPtr rlqs_connection_; FakeStreamPtr rlqs_stream_; IntegrationStreamDecoderPtr response_; + int report_interval_sec = 60; }; INSTANTIATE_TEST_SUITE_P( @@ -676,9 +709,6 @@ TEST_P(RateLimitQuotaIntegrationTest, BasicFlowPeriodicalReport) { EXPECT_TRUE(response_->complete()); EXPECT_EQ(response_->headers().getStatusValue(), "200"); - // TODO(tyxia) Make interval configurable in the test. It is currently 60s in - // ValidMatcherConfig. - int report_interval_sec = 60; // Trigger the report periodically, 10 times. for (int i = 0; i < 10; ++i) { // Advance the time by report_interval. @@ -757,8 +787,6 @@ TEST_P(RateLimitQuotaIntegrationTest, BasicFlowPeriodicalReportWithStreamClosed) EXPECT_TRUE(response_->complete()); EXPECT_EQ(response_->headers().getStatusValue(), "200"); - // ValidMatcherConfig. - int report_interval_sec = 60; // Trigger the report periodically. for (int i = 0; i < 6; ++i) { if (i == 2) { @@ -934,6 +962,153 @@ TEST_P(RateLimitQuotaIntegrationTest, MultiRequestWithTokenBucket) { } } +TEST_P(RateLimitQuotaIntegrationTest, MultiRequestWithUnsupportedStrategy) { + initializeConfig(); + HttpIntegrationTest::initialize(); + absl::flat_hash_map custom_headers = {{"environment", "staging"}, + {"group", "envoy"}}; + + for (int i = 0; i < 2; ++i) { + // Send downstream client request to upstream. + sendClientRequest(&custom_headers); + + // Handle the request received by upstream. All requests will be allowed + // since the strategy is not supported. + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, true); + + // Verify the response to downstream. + ASSERT_TRUE(response_->waitForEndStream()); + EXPECT_TRUE(response_->complete()); + EXPECT_EQ(response_->headers().getStatusValue(), "200"); + + // Only first downstream client request will trigger the reports to RLQS + // server as the subsequent requests will find the entry in the cache. + if (i == 0) { + // Start the gRPC stream to RLQS server. + ASSERT_TRUE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, rlqs_connection_)); + ASSERT_TRUE(rlqs_connection_->waitForNewStream(*dispatcher_, rlqs_stream_)); + + envoy::service::rate_limit_quota::v3::RateLimitQuotaUsageReports reports; + ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports)); + rlqs_stream_->startGrpcStream(); + + // Build the response. + envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response; + absl::flat_hash_map custom_headers_cpy = custom_headers; + custom_headers_cpy.insert({"name", "prod"}); + auto* bucket_action = rlqs_response.add_bucket_action(); + for (const auto& [key, value] : custom_headers_cpy) { + (*bucket_action->mutable_bucket_id()->mutable_bucket()).insert({key, value}); + auto* quota_assignment = bucket_action->mutable_quota_assignment_action(); + quota_assignment->mutable_assignment_time_to_live()->set_seconds(120); + auto* strategy = quota_assignment->mutable_rate_limit_strategy(); + auto* unsupported_strategy = strategy->mutable_requests_per_time_unit(); + unsupported_strategy->set_requests_per_time_unit(10); + unsupported_strategy->set_time_unit(envoy::type::v3::RateLimitUnit::SECOND); + } + + // Send the response from RLQS server. + rlqs_stream_->sendGrpcMessage(rlqs_response); + absl::SleepFor(absl::Seconds(1)); + } + + cleanUp(); + } +} + +TEST_P(RateLimitQuotaIntegrationTest, MultiRequestWithUnsetStrategy) { + initializeConfig(); + HttpIntegrationTest::initialize(); + absl::flat_hash_map custom_headers = {{"environment", "staging"}, + {"group", "envoy"}}; + + for (int i = 0; i < 2; ++i) { + // Send downstream client request to upstream. + sendClientRequest(&custom_headers); + + // Handle the request received by upstream. All requests will be allowed + // since the strategy is not set. + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, true); + + // Verify the response to downstream. + ASSERT_TRUE(response_->waitForEndStream()); + EXPECT_TRUE(response_->complete()); + EXPECT_EQ(response_->headers().getStatusValue(), "200"); + + // Only first downstream client request will trigger the reports to RLQS + // server as the subsequent requests will find the entry in the cache. + if (i == 0) { + // Start the gRPC stream to RLQS server. + ASSERT_TRUE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, rlqs_connection_)); + ASSERT_TRUE(rlqs_connection_->waitForNewStream(*dispatcher_, rlqs_stream_)); + + envoy::service::rate_limit_quota::v3::RateLimitQuotaUsageReports reports; + ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports)); + rlqs_stream_->startGrpcStream(); + + // Build the response. + envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response; + absl::flat_hash_map custom_headers_cpy = custom_headers; + custom_headers_cpy.insert({"name", "prod"}); + auto* bucket_action = rlqs_response.add_bucket_action(); + for (const auto& [key, value] : custom_headers_cpy) { + (*bucket_action->mutable_bucket_id()->mutable_bucket()).insert({key, value}); + auto* quota_assignment = bucket_action->mutable_quota_assignment_action(); + quota_assignment->mutable_assignment_time_to_live()->set_seconds(120); + } + + // Send the response from RLQS server. + rlqs_stream_->sendGrpcMessage(rlqs_response); + absl::SleepFor(absl::Seconds(1)); + } + + cleanUp(); + } +} + +TEST_P(RateLimitQuotaIntegrationTest, MultiRequestWithUnsupportedDefaultAction) { + ConfigOption option; + option.unsupported_no_assignment_strategy = true; + initializeConfig(option); + HttpIntegrationTest::initialize(); + absl::flat_hash_map custom_headers = {{"environment", "staging"}, + {"group", "envoy"}}; + + // Send downstream client request to upstream. + sendClientRequest(&custom_headers); + + // Handle the request received by upstream. All requests will be allowed + // since the strategy is not set. + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, true); + + // Verify the response to downstream. + ASSERT_TRUE(response_->waitForEndStream()); + EXPECT_TRUE(response_->complete()); + EXPECT_EQ(response_->headers().getStatusValue(), "200"); + + // Start the gRPC stream to RLQS server. + ASSERT_TRUE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, rlqs_connection_)); + ASSERT_TRUE(rlqs_connection_->waitForNewStream(*dispatcher_, rlqs_stream_)); + + envoy::service::rate_limit_quota::v3::RateLimitQuotaUsageReports reports; + ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports)); + rlqs_stream_->startGrpcStream(); + + cleanUp(); +} + TEST_P(RateLimitQuotaIntegrationTest, MultiSameRequestWithExpiredAssignmentDeny) { ConfigOption option; option.expired_assignment_blanket_rule = BlanketRule::DENY_ALL; @@ -974,8 +1149,10 @@ TEST_P(RateLimitQuotaIntegrationTest, MultiSameRequestWithExpiredAssignmentDeny) strategy->set_blanket_rule(envoy::type::v3::RateLimitStrategy::ALLOW_ALL); } - // Send the response from RLQS server. - rlqs_stream_->sendGrpcMessage(rlqs_response); + // Send the response from RLQS server and wait for response processing to + // finish for test consistency. + WAIT_FOR_LOG_CONTAINS("debug", "Assignment cached for bucket id", + { rlqs_stream_->sendGrpcMessage(rlqs_response); }); } // 2nd request is throttled because the assignment has expired and @@ -1018,8 +1195,9 @@ TEST_P(RateLimitQuotaIntegrationTest, MultiSameRequestWithExpiredAssignmentAllow // Send downstream client request to upstream. sendClientRequest(&custom_headers); - // 2nd downstream client request will not trigger the reports to RLQS server since it is - // same as first request, which will find the entry in the cache. + // 2nd downstream client request will not trigger the reports to RLQS server + // since it is same as first request, which will find the entry in the + // cache. if (i != 1) { // 1st request will start the gRPC stream. if (i == 0) { @@ -1031,9 +1209,10 @@ TEST_P(RateLimitQuotaIntegrationTest, MultiSameRequestWithExpiredAssignmentAllow ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports)); rlqs_stream_->startGrpcStream(); } else { - // 3rd request won't start gRPC stream again since it is kept open. + // 3rd request won't start gRPC stream again since it is kept open and + // the usage will be aggregated instead of spawning an immediate report. envoy::service::rate_limit_quota::v3::RateLimitQuotaUsageReports reports; - ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports)); + ASSERT_FALSE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports)); } // Build the response. @@ -1050,12 +1229,14 @@ TEST_P(RateLimitQuotaIntegrationTest, MultiSameRequestWithExpiredAssignmentAllow strategy->set_blanket_rule(envoy::type::v3::RateLimitStrategy::ALLOW_ALL); } - // Send the response from RLQS server. - rlqs_stream_->sendGrpcMessage(rlqs_response); + // Send the response from RLQS server and wait for response processing to + // finish for test consistency. + WAIT_FOR_LOG_CONTAINS("debug", "Assignment cached for bucket id", + { rlqs_stream_->sendGrpcMessage(rlqs_response); }); } - // Even though assignment was expired on 2nd request, the request is still allowed because the - // expired assignment behavior is ALLOW_ALL. + // Even though assignment was expired on 2nd request, the request is still + // allowed because the expired assignment behavior is ALLOW_ALL. ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); @@ -1067,32 +1248,47 @@ TEST_P(RateLimitQuotaIntegrationTest, MultiSameRequestWithExpiredAssignmentAllow EXPECT_TRUE(response_->complete()); EXPECT_EQ(response_->headers().getStatusValue(), "200"); - // Clean up the upstream and downstream resource but keep the gRPC connection to RLQS server - // open. + // Clean up the upstream and downstream resource but keep the gRPC + // connection to RLQS server open. cleanupUpstreamAndDownstream(); } } -TEST_P(RateLimitQuotaIntegrationTest, MultiSameRequestWithAbandonAction) { - initializeConfig(); +TEST_P(RateLimitQuotaIntegrationTest, MultiSameRequestWithExpirationToDefaultDeny) { + ConfigOption option; + option.expired_assignment_blanket_rule = BlanketRule::DENY_ALL; + option.no_assignment_blanket_rule = BlanketRule::ALLOW_ALL; + initializeConfig(option); HttpIntegrationTest::initialize(); absl::flat_hash_map custom_headers = {{"environment", "staging"}, {"group", "envoy"}}; - absl::flat_hash_map custom_headers_cpy = custom_headers; - custom_headers_cpy.insert({"name", "prod"}); - for (int i = 0; i < 3; ++i) { + for (int i = 0; i < 4; ++i) { + // Advance the time to make cached assignment expired. + if (i > 0) { + simTime().advanceTimeWait(std::chrono::seconds(15)); + } // Send downstream client request to upstream. sendClientRequest(&custom_headers); - // 3rd downstream request will not trigger the reports to RLQS server since it is - // same as 2nd request, which will find the entry in the cache. - if (i != 2) { - envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response; + // Query 1: ALLOW_ALL by default. Query 2: DENY_ALL by assignment. + // Query 3: DENY_ALL by assignment expiration. Query 4: ALLOW_ALL by default. + if (i == 0 || i == 3) { + // Handle the request received by upstream. + ASSERT_TRUE( + fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, true); + + // Verify the response to downstream. + ASSERT_TRUE(response_->waitForEndStream()); + EXPECT_TRUE(response_->complete()); + EXPECT_EQ(response_->headers().getStatusValue(), "200"); - // 1st request will start the gRPC stream. if (i == 0) { - // Start the gRPC stream to RLQS server on the first request. + // Start the gRPC stream to RLQS server & send the initial report. ASSERT_TRUE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, rlqs_connection_)); ASSERT_TRUE(rlqs_connection_->waitForNewStream(*dispatcher_, rlqs_stream_)); @@ -1101,48 +1297,157 @@ TEST_P(RateLimitQuotaIntegrationTest, MultiSameRequestWithAbandonAction) { rlqs_stream_->startGrpcStream(); // Build the response. + envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response; + absl::flat_hash_map custom_headers_cpy = custom_headers; + custom_headers_cpy.insert({"name", "prod"}); auto* bucket_action = rlqs_response.add_bucket_action(); for (const auto& [key, value] : custom_headers_cpy) { (*bucket_action->mutable_bucket_id()->mutable_bucket()).insert({key, value}); + auto* quota_assignment = bucket_action->mutable_quota_assignment_action(); + quota_assignment->mutable_assignment_time_to_live()->set_seconds(15); + auto* strategy = quota_assignment->mutable_rate_limit_strategy(); + strategy->set_blanket_rule(envoy::type::v3::RateLimitStrategy::DENY_ALL); } - // Set up the abandon action. - bucket_action->mutable_abandon_action(); - } else { - // 2nd request will still send report to RLQS server as the previous abandon - // action has removed the cache entry. but it won't start gRPC stream - // again since it is kept open. + // Send the response from RLQS server. + rlqs_stream_->sendGrpcMessage(rlqs_response); + absl::SleepFor(absl::Seconds(1)); + } + } else { + ASSERT_TRUE(response_->waitForEndStream(std::chrono::seconds(5))); + EXPECT_TRUE(response_->complete()); + EXPECT_EQ(response_->headers().getStatusValue(), "429"); + } + + cleanUp(); + } +} + +TEST_P(RateLimitQuotaIntegrationTest, MultiSameRequestWithExpirationWithoutFallback) { + ConfigOption option; + option.no_assignment_blanket_rule = BlanketRule::ALLOW_ALL; + initializeConfig(option); + HttpIntegrationTest::initialize(); + absl::flat_hash_map custom_headers = {{"environment", "staging"}, + {"group", "envoy"}}; + + for (int i = 0; i < 3; ++i) { + // Advance the time to make cached assignment expired. + if (i > 0) { + simTime().advanceTimeWait(std::chrono::seconds(15)); + } + // Send downstream client request to upstream. + sendClientRequest(&custom_headers); + + // Query 1: ALLOW_ALL by default. Query 2: DENY_ALL by assignment. + // Query 3: DENY_ALL by assignment expiration. Query 4: ALLOW_ALL by default. + if (i == 0 || i == 2) { + // Handle the request received by upstream. + ASSERT_TRUE( + fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, true); + + // Verify the response to downstream. + ASSERT_TRUE(response_->waitForEndStream()); + EXPECT_TRUE(response_->complete()); + EXPECT_EQ(response_->headers().getStatusValue(), "200"); + + if (i == 0) { + // Start the gRPC stream to RLQS server & send the initial report. + ASSERT_TRUE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, rlqs_connection_)); + ASSERT_TRUE(rlqs_connection_->waitForNewStream(*dispatcher_, rlqs_stream_)); + envoy::service::rate_limit_quota::v3::RateLimitQuotaUsageReports reports; ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports)); + rlqs_stream_->startGrpcStream(); - // Build the rlqs server response. + // Build the response. + envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response; + absl::flat_hash_map custom_headers_cpy = custom_headers; + custom_headers_cpy.insert({"name", "prod"}); auto* bucket_action = rlqs_response.add_bucket_action(); for (const auto& [key, value] : custom_headers_cpy) { (*bucket_action->mutable_bucket_id()->mutable_bucket()).insert({key, value}); + auto* quota_assignment = bucket_action->mutable_quota_assignment_action(); + quota_assignment->mutable_assignment_time_to_live()->set_seconds(15); + auto* strategy = quota_assignment->mutable_rate_limit_strategy(); + strategy->set_blanket_rule(envoy::type::v3::RateLimitStrategy::DENY_ALL); } - } - // Send the response from RLQS server. - rlqs_stream_->sendGrpcMessage(rlqs_response); + // Send the response from RLQS server. + rlqs_stream_->sendGrpcMessage(rlqs_response); + absl::SleepFor(absl::Seconds(1)); + } + } else { + ASSERT_TRUE(response_->waitForEndStream(std::chrono::seconds(5))); + EXPECT_TRUE(response_->complete()); + EXPECT_EQ(response_->headers().getStatusValue(), "429"); } - ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); - ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); - ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); - upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); - upstream_request_->encodeData(100, true); + cleanUp(); + } +} - // Verify the response to downstream. - ASSERT_TRUE(response_->waitForEndStream()); - EXPECT_TRUE(response_->complete()); - EXPECT_EQ(response_->headers().getStatusValue(), "200"); +TEST_P(RateLimitQuotaIntegrationTest, MultiSameRequestWithAbandonAction) { + initializeConfig(); + HttpIntegrationTest::initialize(); + absl::flat_hash_map custom_headers = {{"environment", "staging"}, + {"group", "envoy"}}; - // Clean up the upstream and downstream resource but keep the gRPC connection to RLQS server - // open. - cleanupUpstreamAndDownstream(); + absl::flat_hash_map custom_headers_cpy = custom_headers; + custom_headers_cpy.insert({"name", "prod"}); + + // Send first request & expect a new RLQS stream. + sendClientRequest(&custom_headers); + ASSERT_TRUE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, rlqs_connection_)); + ASSERT_TRUE(rlqs_connection_->waitForNewStream(*dispatcher_, rlqs_stream_)); + + // Expect an initial report. + envoy::service::rate_limit_quota::v3::RateLimitQuotaUsageReports reports; + ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports)); + rlqs_stream_->startGrpcStream(); + EXPECT_EQ(reports.bucket_quota_usages_size(), 1); + EXPECT_EQ(reports.bucket_quota_usages(0).num_requests_allowed(), 1); + EXPECT_EQ(reports.bucket_quota_usages(0).num_requests_denied(), 0); + + // Expect the first request to be allowed. + ASSERT_TRUE(expectAllowedRequest()); + + // Build an abandon-action response. + envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response; + auto* bucket_action = rlqs_response.add_bucket_action(); + for (const auto& [key, value] : custom_headers_cpy) { + (*bucket_action->mutable_bucket_id()->mutable_bucket()).insert({key, value}); } + bucket_action->mutable_abandon_action(); + // Send the response from RLQS server. + rlqs_stream_->sendGrpcMessage(rlqs_response); + + // Expect the next report to be empty, since the cache entry was removed, but + // allow for retries in case the response hasn't processed yet. + bool empty_report = false; + for (int i = 0; i < 5 && !empty_report; ++i) { + simTime().advanceTimeWait(std::chrono::seconds(report_interval_sec)); + ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports)); + empty_report = reports.bucket_quota_usages().empty(); + } + ASSERT_TRUE(empty_report); + + // Send a second request & expect an immediate report for the new bucket, but + // no new stream. + sendClientRequest(&custom_headers); + ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports)); + EXPECT_EQ(reports.bucket_quota_usages_size(), 1); + EXPECT_EQ(reports.bucket_quota_usages(0).num_requests_allowed(), 1); + EXPECT_EQ(reports.bucket_quota_usages(0).num_requests_denied(), 0); + + // Expect the second request to be allowed. + ASSERT_TRUE(expectAllowedRequest()); } } // namespace