Skip to content

Commit

Permalink
RLQS: Refactor traffic processing in the RLQS filter & fix a broken e…
Browse files Browse the repository at this point in the history
…xpiration check v2 (#35973)

Commit Message: Refactor the RLQS filter's traffic processing & fix the
broken action-assignment expiration check.
Additional Description: Addresses flakiness from [past
PR](#35723) by adding a missing
`sendDenyResponse` & improving test consistency in asynchronous steps.
Risk Level: Minor
Testing: Integration and unit testing run 1k times to check flakiness

---------

Signed-off-by: Brian Surber <[email protected]>
  • Loading branch information
bsurber authored Sep 5, 2024
1 parent b1aec95 commit 9b3699b
Show file tree
Hide file tree
Showing 5 changed files with 564 additions and 158 deletions.
11 changes: 7 additions & 4 deletions source/extensions/filters/http/rate_limit_quota/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void RateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response)

// Get the hash id value from BucketId in the response.
const size_t bucket_id = MessageUtil::hash(action.bucket_id());
ENVOY_LOG(trace,
ENVOY_LOG(debug,
"Received a response for bucket id proto :\n {}, and generated "
"the associated hashed bucket id: {}",
action.bucket_id().DebugString(), bucket_id);
Expand All @@ -97,10 +97,11 @@ void RateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response)
switch (action.bucket_action_case()) {
case envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse_BucketAction::
kQuotaAssignmentAction: {
quota_buckets_[bucket_id]->bucket_action = action;
if (quota_buckets_[bucket_id]->bucket_action.has_quota_assignment_action()) {
quota_buckets_[bucket_id]->cached_action = action;
quota_buckets_[bucket_id]->current_assignment_time = time_source_.monotonicTime();
if (quota_buckets_[bucket_id]->cached_action->has_quota_assignment_action()) {
auto rate_limit_strategy = quota_buckets_[bucket_id]
->bucket_action.quota_assignment_action()
->cached_action->quota_assignment_action()
.rate_limit_strategy();

if (rate_limit_strategy.has_token_bucket()) {
Expand All @@ -127,6 +128,7 @@ void RateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response)
case envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse_BucketAction::
kAbandonAction: {
quota_buckets_.erase(bucket_id);
ENVOY_LOG(debug, "Bucket id {} removed from the cache by abandon action.", bucket_id);
break;
}
default: {
Expand All @@ -136,6 +138,7 @@ void RateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response)
}
}
}
ENVOY_LOG(debug, "Assignment cached for bucket id {}.", bucket_id);
}

// `rlqs_callback_` has been reset to nullptr for periodical report case.
Expand Down
274 changes: 174 additions & 100 deletions source/extensions/filters/http/rate_limit_quota/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace Extensions {
namespace HttpFilters {
namespace RateLimitQuota {

using envoy::type::v3::RateLimitStrategy;

const char kBucketMetadataNamespace[] = "envoy.extensions.http_filters.rate_limit_quota.bucket";

Http::FilterHeadersStatus RateLimitQuotaFilter::decodeHeaders(Http::RequestHeaderMap& headers,
Expand All @@ -23,7 +25,7 @@ Http::FilterHeadersStatus RateLimitQuotaFilter::decodeHeaders(Http::RequestHeade
// allowed/denied, matching succeed/fail and so on.
ENVOY_LOG(debug,
"The request is not matched by any matchers: ", match_result.status().message());
return Envoy::Http::FilterHeadersStatus::Continue;
return sendAllowResponse();
}

// Second, generate the bucket id for this request based on match action when the request matching
Expand All @@ -36,7 +38,7 @@ Http::FilterHeadersStatus RateLimitQuotaFilter::decodeHeaders(Http::RequestHeade
// When it failed to generate the bucket id for this specific request, the request is ALLOWED by
// default (i.e., fail-open).
ENVOY_LOG(debug, "Unable to generate the bucket id: {}", ret.status().message());
return Envoy::Http::FilterHeadersStatus::Continue;
return sendAllowResponse();
}

const BucketId& bucket_id_proto = *ret;
Expand Down Expand Up @@ -123,34 +125,36 @@ void RateLimitQuotaFilter::createNewBucket(const BucketId& bucket_id,
// Create new bucket and store it into quota cache.
std::unique_ptr<Bucket> new_bucket = std::make_unique<Bucket>();

// The first matched request doesn't have quota assignment from the RLQS server yet, so the
// action is performed based on pre-configured strategy from no assignment behavior config.
// The first matched request doesn't have quota assignment from the RLQS
// server yet, so the action is performed based on pre-configured strategy
// from no assignment behavior config.
auto mutable_rate_limit_strategy =
new_bucket->bucket_action.mutable_quota_assignment_action()->mutable_rate_limit_strategy();
new_bucket->default_action.mutable_quota_assignment_action()->mutable_rate_limit_strategy();
if (match_action.bucketSettings().has_no_assignment_behavior()) {
*mutable_rate_limit_strategy =
match_action.bucketSettings().no_assignment_behavior().fallback_rate_limit();
} else {
// When `no_assignment_behavior` is not configured, default blanket rule is set to ALLOW_ALL.
// (i.e., fail-open).
mutable_rate_limit_strategy->set_blanket_rule(envoy::type::v3::RateLimitStrategy::ALLOW_ALL);
// When `no_assignment_behavior` is not configured, default blanket rule is
// set to ALLOW_ALL. (i.e., fail-open).
mutable_rate_limit_strategy->set_blanket_rule(RateLimitStrategy::ALLOW_ALL);
}

// Set up the bucket id.
new_bucket->bucket_id = bucket_id;
// Set up the first time assignment time.
new_bucket->first_assignment_time = time_source_.monotonicTime();
// Mark the assignment time.
auto now = time_source_.monotonicTime();
new_bucket->current_assignment_time = now;
// Set up the quota usage.
QuotaUsage quota_usage;
quota_usage.last_report = std::chrono::duration_cast<std::chrono::nanoseconds>(
time_source_.monotonicTime().time_since_epoch());
quota_usage.last_report =
std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch());

switch (mutable_rate_limit_strategy->blanket_rule()) {
PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
case envoy::type::v3::RateLimitStrategy::ALLOW_ALL:
case RateLimitStrategy::ALLOW_ALL:
quota_usage.num_requests_allowed++;
break;
case envoy::type::v3::RateLimitStrategy::DENY_ALL:
case RateLimitStrategy::DENY_ALL:
quota_usage.num_requests_denied++;
break;
}
Expand All @@ -159,6 +163,8 @@ void RateLimitQuotaFilter::createNewBucket(const BucketId& bucket_id,
quota_buckets_[id] = std::move(new_bucket);
}

// This function should not update QuotaUsage as that will have been handled
// when constructing the Report before this function is called.
Http::FilterHeadersStatus
RateLimitQuotaFilter::sendImmediateReport(const size_t bucket_id,
const RateLimitOnMatchAction& match_action) {
Expand All @@ -183,10 +189,9 @@ RateLimitQuotaFilter::sendImmediateReport(const size_t bucket_id,
if (!status.ok()) {
ENVOY_LOG(error, "Failed to start the gRPC stream: ", status.message());
// TODO(tyxia) Check `NoAssignmentBehavior` behavior instead of fail-open here.
return Envoy::Http::FilterHeadersStatus::Continue;
} else {
ENVOY_LOG(debug, "The gRPC stream is established and active");
return sendAllowResponse();
}
ENVOY_LOG(debug, "The gRPC stream is established and active");

// Send the usage report to RLQS server immediately on the first time when the request is
// matched.
Expand All @@ -202,103 +207,172 @@ RateLimitQuotaFilter::sendImmediateReport(const size_t bucket_id,
// bucket_matchers for the first time) should be already set based on no assignment behavior in
// `createNewBucket` when the bucket is initially created.
ASSERT(quota_buckets_.find(bucket_id) != quota_buckets_.end());
if (quota_buckets_[bucket_id]
->bucket_action.quota_assignment_action()
.rate_limit_strategy()
.blanket_rule() == envoy::type::v3::RateLimitStrategy::ALLOW_ALL) {
ENVOY_LOG(
trace,
"For first matched request with hashed bucket_id {}, it is allowed by ALLOW_ALL strategy.",
bucket_id);
return Http::FilterHeadersStatus::Continue;
} else {
// For the request that is rejected due to DENY_ALL no_assignment_behavior, immediate report is
// still sent to RLQS server above, and here the local reply with deny response is sent.
ENVOY_LOG(
trace,
"For first matched request with hashed bucket_id {}, it is throttled by DENY_ALL strategy.",
bucket_id);
sendDenyResponse();
return Envoy::Http::FilterHeadersStatus::StopIteration;
// If not given a default blanket rule, the first matched request is allowed.
if (!quota_buckets_[bucket_id]->default_action.has_quota_assignment_action() ||
!quota_buckets_[bucket_id]
->default_action.quota_assignment_action()
.has_rate_limit_strategy() ||
!quota_buckets_[bucket_id]
->default_action.quota_assignment_action()
.rate_limit_strategy()
.has_blanket_rule()) {
ENVOY_LOG(trace, "Without a default blanket rule configured, the first matched "
"request with hashed bucket_id {} is allowed through.");
ENVOY_LOG(debug, "Default action for bucket_id {} does not contain a blanket action: {}",
bucket_id, quota_buckets_[bucket_id]->default_action.DebugString());
return sendAllowResponse();
}
auto blanket_rule = quota_buckets_[bucket_id]
->default_action.quota_assignment_action()
.rate_limit_strategy()
.blanket_rule();
if (blanket_rule == RateLimitStrategy::DENY_ALL) {
// For the request that is rejected due to DENY_ALL
// no_assignment_behavior, immediate report is still sent to RLQS server
// above, and here the local reply with deny response is sent.
ENVOY_LOG(trace,
"For first matched request with hashed bucket_id {}, it is "
"throttled by DENY_ALL strategy.",
bucket_id);
ENVOY_LOG(debug, "Hit configured default DENY_ALL for bucket_id {}", bucket_id);
return sendDenyResponse();
}

ENVOY_LOG(trace,
"For first matched request with hashed bucket_id {}, it is "
"allowed by the configured default ALLOW_ALL strategy.",
bucket_id);
ENVOY_LOG(debug, "Hit configured default ALLOW_ALL for bucket_id {}", bucket_id);
return sendAllowResponse();
}

Http::FilterHeadersStatus
RateLimitQuotaFilter::processCachedBucket(size_t bucket_id,
const RateLimitOnMatchAction& match_action) {
RateLimitQuotaFilter::setUsageAndResponseFromAction(const BucketAction& action,
const size_t bucket_id) {
if (!action.has_quota_assignment_action() ||
!action.quota_assignment_action().has_rate_limit_strategy()) {
ENVOY_LOG(debug,
"Selected bucket action defaulting to ALLOW_ALL as it does not "
"have an assignment for bucket_id {}",
bucket_id);
return sendAllowResponse(&quota_buckets_[bucket_id]->quota_usage);
}

// TODO(tyxia) Currently, blanket rule and token bucket strategies are
// implemented. Change to switch case when `RequestsPerTimeUnit` strategy is
// implemented.
auto rate_limit_strategy = action.quota_assignment_action().rate_limit_strategy();
if (rate_limit_strategy.has_blanket_rule()) {
bool allow = (rate_limit_strategy.blanket_rule() != RateLimitStrategy::DENY_ALL);
ENVOY_LOG(trace, "Request with hashed bucket_id {} is {} by the selected blanket rule.",
bucket_id, allow ? "allowed" : "denied");
if (allow) {
return sendAllowResponse(&quota_buckets_[bucket_id]->quota_usage);
}
return sendDenyResponse(&quota_buckets_[bucket_id]->quota_usage);
}

if (rate_limit_strategy.has_token_bucket()) {
auto token_bucket = quota_buckets_[bucket_id]->token_bucket_limiter.get();
ASSERT(token_bucket);

// Try to consume 1 token from the bucket.
if (token_bucket->consume(1, /*allow_partial=*/false)) {
// Request is allowed.
ENVOY_LOG(trace,
"Request with hashed bucket_id {} is allowed by token bucket "
"limiter.",
bucket_id);
ENVOY_LOG(debug,
"Allowing request as token bucket is not empty for bucket_id "
"{}. Initial assignment: {}.",
bucket_id, rate_limit_strategy.token_bucket().ShortDebugString());
return sendAllowResponse(&quota_buckets_[bucket_id]->quota_usage);
}
// Request is throttled.
ENVOY_LOG(trace,
"Request with hashed bucket_id {} is throttled by token "
"bucket limiter",
bucket_id);
ENVOY_LOG(debug,
"Denying request as token bucket is exhausted for bucket_id {}. "
"Initial assignment: {}.",
bucket_id, rate_limit_strategy.token_bucket().ShortDebugString());
return sendDenyResponse(&quota_buckets_[bucket_id]->quota_usage);
}

ENVOY_LOG(error,
"Failing open as selected bucket action for bucket_id {} contains "
"an unsupported rate limit strategy: {}",
bucket_id, rate_limit_strategy.DebugString());
return sendAllowResponse(&quota_buckets_[bucket_id]->quota_usage);
}

bool isCachedActionExpired(TimeSource& time_source, const Bucket& bucket) {
// First, check if assignment has expired nor not.
auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(
time_source_.monotonicTime().time_since_epoch());
time_source.monotonicTime().time_since_epoch());
auto assignment_time_elapsed = Protobuf::util::TimeUtil::NanosecondsToDuration(
(now - std::chrono::duration_cast<std::chrono::nanoseconds>(
quota_buckets_[bucket_id]->first_assignment_time.time_since_epoch()))
bucket.current_assignment_time.time_since_epoch()))
.count());
if (assignment_time_elapsed > quota_buckets_[bucket_id]
->bucket_action.quota_assignment_action()
.assignment_time_to_live()) {
// If expired, remove the cache entry.
quota_buckets_.erase(bucket_id);

// Default strategy is fail-Open (i.e., allow_all).
auto ret_status = Envoy::Http::FilterHeadersStatus::Continue;
// Check the expired assignment behavior if configured.
// Note, only fail-open and fail-close are supported, more advanced expired assignment can be
// supported as needed.
if (match_action.bucketSettings().has_expired_assignment_behavior()) {
if (match_action.bucketSettings()
.expired_assignment_behavior()
.fallback_rate_limit()
.blanket_rule() == envoy::type::v3::RateLimitStrategy::DENY_ALL) {
sendDenyResponse();
ret_status = Envoy::Http::FilterHeadersStatus::StopIteration;
}
}

return (assignment_time_elapsed >
bucket.cached_action->quota_assignment_action().assignment_time_to_live());
}

Http::FilterHeadersStatus
RateLimitQuotaFilter::processCachedBucket(size_t bucket_id,
const RateLimitOnMatchAction& match_action) {
auto* cached_bucket = quota_buckets_[bucket_id].get();

// If no cached action, use the default action.
if (!cached_bucket->cached_action.has_value()) {
return setUsageAndResponseFromAction(cached_bucket->default_action, bucket_id);
}

// If expired, remove the expired action & fallback.
if (isCachedActionExpired(time_source_, *cached_bucket)) {
Http::FilterHeadersStatus ret_status = processExpiredBucket(bucket_id, match_action);
cached_bucket->cached_action = std::nullopt;
return ret_status;
}

// Second, get the quota assignment (if exists) from the cached bucket action.
if (quota_buckets_[bucket_id]->bucket_action.has_quota_assignment_action()) {
auto rate_limit_strategy =
quota_buckets_[bucket_id]->bucket_action.quota_assignment_action().rate_limit_strategy();

// TODO(tyxia) Currently, blanket rule and token bucket strategies are implemented.
// Change to switch case when `RequestsPerTimeUnit` strategy is implemented.
if (rate_limit_strategy.has_blanket_rule()) {
if (rate_limit_strategy.blanket_rule() == envoy::type::v3::RateLimitStrategy::ALLOW_ALL) {
quota_buckets_[bucket_id]->quota_usage.num_requests_allowed += 1;
ENVOY_LOG(trace,
"Request with hashed bucket_id {} is allowed by cached ALLOW_ALL strategy.",
bucket_id);
} else if (rate_limit_strategy.blanket_rule() ==
envoy::type::v3::RateLimitStrategy::DENY_ALL) {
quota_buckets_[bucket_id]->quota_usage.num_requests_denied += 1;
ENVOY_LOG(trace,
"Request with hashed bucket_id {} is throttled by cached DENY_ALL strategy.",
bucket_id);
sendDenyResponse();
return Envoy::Http::FilterHeadersStatus::StopIteration;
}
} else if (rate_limit_strategy.has_token_bucket()) {
ASSERT(quota_buckets_[bucket_id]->token_bucket_limiter != nullptr);
TokenBucket* limiter = quota_buckets_[bucket_id]->token_bucket_limiter.get();
// Try to consume 1 token from the bucket.
if (limiter->consume(1, /*allow_partial=*/false)) {
// Request is allowed.
quota_buckets_[bucket_id]->quota_usage.num_requests_allowed += 1;
ENVOY_LOG(trace, "Request with hashed bucket_id {} is allowed by token bucket limiter.",
bucket_id);
} else {
// Request is throttled.
quota_buckets_[bucket_id]->quota_usage.num_requests_denied += 1;
ENVOY_LOG(trace, "Request with hashed bucket_id {} is throttled by token bucket limiter",
bucket_id);
sendDenyResponse();
return Envoy::Http::FilterHeadersStatus::StopIteration;
}
}
// If not expired, use the cached action.
return setUsageAndResponseFromAction(*cached_bucket->cached_action, bucket_id);
}

// Note: does not remove the expired entity from the cache.
Http::FilterHeadersStatus
RateLimitQuotaFilter::processExpiredBucket(size_t bucket_id,
const RateLimitOnMatchAction& match_action) {
auto* cached_bucket = quota_buckets_[bucket_id].get();

if (!match_action.bucketSettings().has_expired_assignment_behavior() ||
!match_action.bucketSettings().expired_assignment_behavior().has_fallback_rate_limit()) {
ENVOY_LOG(debug,
"Selecting default action for bucket_id as expiration "
"fallback assignment doesn't have a configured override {}",
match_action.bucketSettings().expired_assignment_behavior().DebugString());
return setUsageAndResponseFromAction(cached_bucket->default_action, bucket_id);
}
return Envoy::Http::FilterHeadersStatus::Continue;

const RateLimitStrategy& fallback_rate_limit =
match_action.bucketSettings().expired_assignment_behavior().fallback_rate_limit();
if (fallback_rate_limit.has_blanket_rule() &&
fallback_rate_limit.blanket_rule() == RateLimitStrategy::DENY_ALL) {
ENVOY_LOG(debug,
"Exipred action falling back to configured DENY_ALL for "
"bucket_id {}",
bucket_id);
return sendDenyResponse(&cached_bucket->quota_usage);
}

ENVOY_LOG(debug,
"Exipred action falling back to ALLOW_ALL for bucket_id {} with "
"fallback action {}",
bucket_id, fallback_rate_limit.DebugString());
return sendAllowResponse(&cached_bucket->quota_usage);
}

} // namespace RateLimitQuota
Expand Down
Loading

0 comments on commit 9b3699b

Please sign in to comment.