Skip to content

Commit

Permalink
Using drop_overload category in EDS to report drop_overload stats in …
Browse files Browse the repository at this point in the history
…LRS (#36047)

Using drop_overload category in EDS to report drop_overload stats in
LRS.

This is a follow up PR to support drop_overload load report service:
#31384

Currently it is reporting with a fixed category "drop_overload". This PR
changes it into the category passed in by cluster or EDS policy
configuration.

---------

Signed-off-by: Yanjun Xiang <[email protected]>
  • Loading branch information
yanjunxiang-google authored Sep 19, 2024
1 parent 596beac commit 6069f86
Show file tree
Hide file tree
Showing 16 changed files with 82 additions and 18 deletions.
5 changes: 5 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ bug_fixes:
the number of requests per I/O cycle is configured and an HTTP decoder filter that pauses filter chain is present. This behavior
can be reverted by setting the runtime guard ``envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream``
to false.
- area: upstream
change: |
Fixed a bug using hard coded drop category when reporting drop_overload stats to the load report service.
It is changed to use drop category that is set in
:ref:`category <envoy_v3_api_field_config.endpoint.v3.clusterloadassignment.policy.DropOverload.category>`.
- area: proxy_filter
change: |
Fixed a bug in the ``CONNECT`` implementation that would cause the ``CONNECT`` request created to be invalid when the
Expand Down
10 changes: 10 additions & 0 deletions envoy/upstream/thread_local_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,20 @@ class ThreadLocalCluster {
*/
virtual UnitFloat dropOverload() const PURE;

/**
* @return the thread local cluster drop_category configuration.
*/
virtual const std::string& dropCategory() const PURE;

/**
* Set up the drop_overload value for the thread local cluster.
*/
virtual void setDropOverload(UnitFloat drop_overload) PURE;

/**
* Set up the drop_category value for the thread local cluster.
*/
virtual void setDropCategory(absl::string_view drop_category) PURE;
};

using ThreadLocalClusterOptRef = absl::optional<std::reference_wrapper<ThreadLocalCluster>>;
Expand Down
10 changes: 10 additions & 0 deletions envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -1315,10 +1315,20 @@ class Cluster {
*/
virtual UnitFloat dropOverload() const PURE;

/**
* @return the cluster drop_category_ configuration.
*/
virtual const std::string& dropCategory() const PURE;

/**
* Set up the drop_overload value for the cluster.
*/
virtual void setDropOverload(UnitFloat drop_overload) PURE;

/**
* Set up the drop_category value for the thread local cluster.
*/
virtual void setDropCategory(absl::string_view drop_category) PURE;
};

using ClusterSharedPtr = std::shared_ptr<Cluster>;
Expand Down
25 changes: 15 additions & 10 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1236,15 +1236,18 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_
pending_cluster_creations_.erase(cm_cluster.cluster().info()->name());

const UnitFloat drop_overload = cm_cluster.cluster().dropOverload();
const std::string drop_category = cm_cluster.cluster().dropCategory();
// Populate the cluster initialization object based on this update.
ClusterInitializationObjectConstSharedPtr cluster_initialization_object =
addOrUpdateClusterInitializationObjectIfSupported(
params, cm_cluster.cluster().info(), load_balancer_factory, host_map, drop_overload);
addOrUpdateClusterInitializationObjectIfSupported(params, cm_cluster.cluster().info(),
load_balancer_factory, host_map,
drop_overload, drop_category);

tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params),
add_or_update_cluster, load_balancer_factory, map = std::move(host_map),
cluster_initialization_object = std::move(cluster_initialization_object),
drop_overload](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
drop_overload, drop_category = std::move(drop_category)](
OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
ASSERT(cluster_manager.has_value(),
"Expected the ThreadLocalClusterManager to be set during ClusterManagerImpl creation.");

Expand Down Expand Up @@ -1302,6 +1305,7 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_

if (cluster_manager->thread_local_clusters_[info->name()]) {
cluster_manager->thread_local_clusters_[info->name()]->setDropOverload(drop_overload);
cluster_manager->thread_local_clusters_[info->name()]->setDropCategory(drop_category);
}
for (const auto& per_priority : params.per_priority_update_params_) {
cluster_manager->updateClusterMembership(
Expand Down Expand Up @@ -1338,7 +1342,7 @@ ClusterManagerImpl::ClusterInitializationObjectConstSharedPtr
ClusterManagerImpl::addOrUpdateClusterInitializationObjectIfSupported(
const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload) {
UnitFloat drop_overload, absl::string_view drop_category) {
if (!deferralIsSupportedForCluster(cluster_info)) {
return nullptr;
}
Expand Down Expand Up @@ -1369,13 +1373,13 @@ ClusterManagerImpl::addOrUpdateClusterInitializationObjectIfSupported(
entry->second->per_priority_state_, params, std::move(cluster_info),
load_balancer_factory == nullptr ? entry->second->load_balancer_factory_
: load_balancer_factory,
map, drop_overload);
map, drop_overload, drop_category);
cluster_initialization_map_[cluster_name] = new_initialization_object;
return new_initialization_object;
} else {
// We need to create a fresh Cluster Initialization Object.
auto new_initialization_object = std::make_shared<ClusterInitializationObject>(
params, std::move(cluster_info), load_balancer_factory, map, drop_overload);
params, std::move(cluster_info), load_balancer_factory, map, drop_overload, drop_category);
cluster_initialization_map_[cluster_name] = new_initialization_object;
return new_initialization_object;
}
Expand Down Expand Up @@ -1409,6 +1413,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::initializeClusterInlineIfExis
initialization_object->cross_priority_host_map_);
}
thread_local_clusters_[cluster]->setDropOverload(initialization_object->drop_overload_);
thread_local_clusters_[cluster]->setDropCategory(initialization_object->drop_category_);

// Remove the CIO as we've initialized the cluster.
thread_local_deferred_clusters_.erase(entry);
Expand All @@ -1419,9 +1424,9 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::initializeClusterInlineIfExis
ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload)
UnitFloat drop_overload, absl::string_view drop_category)
: cluster_info_(std::move(cluster_info)), load_balancer_factory_(load_balancer_factory),
cross_priority_host_map_(map), drop_overload_(drop_overload) {
cross_priority_host_map_(map), drop_overload_(drop_overload), drop_category_(drop_category) {
// Copy the update since the map is empty.
for (const auto& update : params.per_priority_update_params_) {
per_priority_state_.emplace(update.priority_, update);
Expand All @@ -1432,10 +1437,10 @@ ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
const absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority>& per_priority_state,
const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload)
UnitFloat drop_overload, absl::string_view drop_category)
: per_priority_state_(per_priority_state), cluster_info_(std::move(cluster_info)),
load_balancer_factory_(load_balancer_factory), cross_priority_host_map_(map),
drop_overload_(drop_overload) {
drop_overload_(drop_overload), drop_category_(drop_category) {

// Because EDS Clusters receive the entire ClusterLoadAssignment but only
// provides the delta we must process the hosts_added and hosts_removed and
Expand Down
13 changes: 10 additions & 3 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,20 +427,22 @@ class ClusterManagerImpl : public ClusterManager,
ClusterInitializationObject(const ThreadLocalClusterUpdateParams& params,
ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory,
HostMapConstSharedPtr map, UnitFloat drop_overload);
HostMapConstSharedPtr map, UnitFloat drop_overload,
absl::string_view drop_category);

ClusterInitializationObject(
const absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority>&
per_priority_state,
const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload);
UnitFloat drop_overload, absl::string_view drop_category);

absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority> per_priority_state_;
const ClusterInfoConstSharedPtr cluster_info_;
const LoadBalancerFactorySharedPtr load_balancer_factory_;
const HostMapConstSharedPtr cross_priority_host_map_;
UnitFloat drop_overload_{0};
const std::string drop_category_;
};

using ClusterInitializationObjectConstSharedPtr =
Expand Down Expand Up @@ -610,7 +612,11 @@ class ClusterManagerImpl : public ClusterManager,
void drainConnPools(DrainConnectionsHostPredicate predicate,
ConnectionPool::DrainBehavior behavior);
UnitFloat dropOverload() const override { return drop_overload_; }
const std::string& dropCategory() const override { return drop_category_; }
void setDropOverload(UnitFloat drop_overload) override { drop_overload_ = drop_overload; }
void setDropCategory(absl::string_view drop_category) override {
drop_category_ = drop_category;
}

private:
Http::ConnectionPool::Instance*
Expand All @@ -627,6 +633,7 @@ class ClusterManagerImpl : public ClusterManager,
ThreadLocalClusterManagerImpl& parent_;
PrioritySetImpl priority_set_;
UnitFloat drop_overload_{0};
std::string drop_category_;

// Don't change the order of cluster_info_ and lb_factory_/lb_ as the the lb_factory_/lb_
// may keep a reference to the cluster_info_.
Expand Down Expand Up @@ -889,7 +896,7 @@ class ClusterManagerImpl : public ClusterManager,
ClusterInitializationObjectConstSharedPtr addOrUpdateClusterInitializationObjectIfSupported(
const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload);
UnitFloat drop_overload, absl::string_view drop_category);

bool deferralIsSupportedForCluster(const ClusterInfoConstSharedPtr& info) const;

Expand Down
6 changes: 5 additions & 1 deletion source/common/upstream/health_discovery_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ class HdsCluster : public Cluster, Logger::Loggable<Logger::Id::upstream> {

std::vector<Upstream::HealthCheckerSharedPtr> healthCheckers() { return health_checkers_; };
std::vector<HostSharedPtr> hosts() { return *hosts_; };
UnitFloat dropOverload() const override { return UnitFloat(0); }
UnitFloat dropOverload() const override { return drop_overload_; }
const std::string& dropCategory() const override { return drop_category_; }
void setDropOverload(UnitFloat) override {}
void setDropCategory(absl::string_view) override {}

protected:
PrioritySetImpl priority_set_;
Expand All @@ -99,6 +101,8 @@ class HdsCluster : public Cluster, Logger::Loggable<Logger::Id::upstream> {
std::vector<Upstream::HealthCheckerSharedPtr> health_checkers_;
HealthCheckerMap health_checkers_map_;
TimeSource& time_source_;
UnitFloat drop_overload_{0};
const std::string drop_category_;

absl::Status updateHealthchecks(
const Protobuf::RepeatedPtrField<envoy::config::core::v3::HealthCheck>& health_checks);
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/load_stats_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void LoadStatsReporter::sendLoadStatsRequest() {
cluster.info()->loadReportStats().upstream_rq_drop_overload_.latch();
if (drop_overload_count > 0) {
auto* dropped_request = cluster_stats->add_dropped_requests();
dropped_request->set_category("drop_overload");
dropped_request->set_category(cluster.dropCategory());
dropped_request->set_dropped_count(drop_overload_count);
}

Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1832,6 +1832,7 @@ absl::Status ClusterImplBase::parseDropOverloadConfig(

drop_ratio = std::min(drop_ratio, float(drop_ratio_runtime) / float(MAX_DROP_OVERLOAD_RUNTIME));
drop_overload_ = UnitFloat(drop_ratio);
drop_category_ = policy.drop_overloads(0).category();
return absl::OkStatus();
}

Expand Down
3 changes: 3 additions & 0 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,9 @@ class ClusterImplBase : public Cluster, protected Logger::Loggable<Logger::Id::u
const Outlier::Detector* outlierDetector() const override { return outlier_detector_.get(); }
void initialize(std::function<void()> callback) override;
UnitFloat dropOverload() const override { return drop_overload_; }
const std::string& dropCategory() const override { return drop_category_; }
void setDropOverload(UnitFloat drop_overload) override { drop_overload_ = drop_overload; }
void setDropCategory(absl::string_view drop_category) override { drop_category_ = drop_category; }

protected:
ClusterImplBase(const envoy::config::cluster::v3::Cluster& cluster,
Expand Down Expand Up @@ -1271,6 +1273,7 @@ class ClusterImplBase : public Cluster, protected Logger::Loggable<Logger::Id::u
Config::ConstMetadataSharedPoolSharedPtr const_metadata_shared_pool_;
Common::CallbackHandlePtr priority_update_cb_;
UnitFloat drop_overload_{0};
std::string drop_category_;
static constexpr int kDropOverloadSize = 1;
};

Expand Down
2 changes: 1 addition & 1 deletion source/server/admin/config_dump_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ ConfigDumpHandler::dumpEndpointConfigs(const Matchers::StringMatcher& name_match
float value = cluster.dropOverload().value() * 1000000;
if (value > 0) {
auto* drop_overload = policy.add_drop_overloads();
drop_overload->set_category("drop_overload");
drop_overload->set_category(cluster.dropCategory());
auto* percent = drop_overload->mutable_drop_percentage();
percent->set_denominator(envoy::type::v3::FractionalPercent::MILLION);
percent->set_numerator(uint32_t(value));
Expand Down
4 changes: 3 additions & 1 deletion test/common/upstream/upstream_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ TEST_P(StrictDnsParamTest, DropOverLoadConfigTestBasicMillion) {
false);
auto cluster = *StrictDnsClusterImpl::create(cluster_config, factory_context, dns_resolver);
EXPECT_EQ(0.000035f, cluster->dropOverload().value());
EXPECT_EQ("test", cluster->dropCategory());
}

TEST_P(StrictDnsParamTest, DropOverLoadConfigTestBasicTenThousand) {
Expand All @@ -290,7 +291,7 @@ TEST_P(StrictDnsParamTest, DropOverLoadConfigTestBasicTenThousand) {
load_assignment:
policy:
drop_overloads:
category: test
category: foo
drop_percentage:
numerator: 1000
denominator: TEN_THOUSAND
Expand All @@ -301,6 +302,7 @@ TEST_P(StrictDnsParamTest, DropOverLoadConfigTestBasicTenThousand) {
false);
auto cluster = *StrictDnsClusterImpl::create(cluster_config, factory_context, dns_resolver);
EXPECT_EQ(0.1f, cluster->dropOverload().value());
EXPECT_EQ("foo", cluster->dropCategory());
}

TEST_P(StrictDnsParamTest, DropOverLoadConfigTestBadDenominator) {
Expand Down
6 changes: 6 additions & 0 deletions test/mocks/upstream/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace Upstream {
using ::testing::_;
using ::testing::Invoke;
using ::testing::Return;
using ::testing::ReturnRef;

MockCluster::MockCluster() {
ON_CALL(*this, info()).WillByDefault(Return(info_));
ON_CALL(*this, initialize(_))
Expand All @@ -16,9 +18,13 @@ MockCluster::MockCluster() {
initialize_callback_ = callback;
}));
ON_CALL(*this, dropOverload()).WillByDefault(Return(drop_overload_));
ON_CALL(*this, dropCategory()).WillByDefault(ReturnRef(drop_category_));
ON_CALL(*this, setDropOverload(_)).WillByDefault(Invoke([this](UnitFloat drop_overload) -> void {
drop_overload_ = drop_overload;
}));
ON_CALL(*this, setDropCategory(_))
.WillByDefault(Invoke(
[this](absl::string_view drop_category) -> void { drop_category_ = drop_category; }));
}

MockCluster::~MockCluster() = default;
Expand Down
3 changes: 3 additions & 0 deletions test/mocks/upstream/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ class MockCluster : public Cluster {
MOCK_METHOD(PrioritySet&, prioritySet, ());
MOCK_METHOD(const PrioritySet&, prioritySet, (), (const));
MOCK_METHOD(UnitFloat, dropOverload, (), (const));
MOCK_METHOD(const std::string&, dropCategory, (), (const));
MOCK_METHOD(void, setDropOverload, (UnitFloat));
MOCK_METHOD(void, setDropCategory, (absl::string_view));

std::shared_ptr<MockClusterInfo> info_{new ::testing::NiceMock<MockClusterInfo>()};
std::function<void()> initialize_callback_;
Network::Address::InstanceConstSharedPtr source_address_;
UnitFloat drop_overload_{0};
std::string drop_category_{"drop_overload"};
};
} // namespace Upstream
} // namespace Envoy
5 changes: 5 additions & 0 deletions test/mocks/upstream/thread_local_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ MockThreadLocalCluster::MockThreadLocalCluster() {
.WillByDefault(Return(Upstream::TcpPoolData([]() {}, &tcp_conn_pool_)));
ON_CALL(*this, httpAsyncClient()).WillByDefault(ReturnRef(async_client_));
ON_CALL(*this, dropOverload()).WillByDefault(Return(cluster_.drop_overload_));
ON_CALL(*this, dropCategory()).WillByDefault(ReturnRef(cluster_.drop_category_));
ON_CALL(*this, setDropOverload(_)).WillByDefault(Invoke([this](UnitFloat drop_overload) -> void {
cluster_.drop_overload_ = drop_overload;
}));
ON_CALL(*this, setDropCategory(_))
.WillByDefault(Invoke([this](absl::string_view drop_category) -> void {
cluster_.drop_category_ = drop_category;
}));
}

MockThreadLocalCluster::~MockThreadLocalCluster() = default;
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/upstream/thread_local_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ class MockThreadLocalCluster : public ThreadLocalCluster {
MOCK_METHOD(Tcp::AsyncTcpClientPtr, tcpAsyncClient,
(LoadBalancerContext * context, Tcp::AsyncTcpClientOptionsConstSharedPtr options));
MOCK_METHOD(UnitFloat, dropOverload, (), (const));
MOCK_METHOD(const std::string&, dropCategory, (), (const));
MOCK_METHOD(void, setDropOverload, (UnitFloat));
MOCK_METHOD(void, setDropCategory, (absl::string_view));

NiceMock<MockClusterMockPrioritySet> cluster_;
NiceMock<MockLoadBalancer> lb_;
Expand Down
3 changes: 2 additions & 1 deletion test/server/admin/config_dump_handler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ TEST_P(AdminInstanceTest, ConfigDumpWithEndpoint) {
hostname_for_healthcheck, "tcp://1.2.3.5:90", 5, 6);
// Adding drop_overload config.
ON_CALL(cluster, dropOverload()).WillByDefault(Return(UnitFloat(0.00035)));

const std::string drop_overload = "drop_overload";
ON_CALL(cluster, dropCategory()).WillByDefault(ReturnRef(drop_overload));
Buffer::OwnedImpl response;
Http::TestResponseHeaderMapImpl header_map;
EXPECT_EQ(Http::Code::OK, getCallback("/config_dump?include_eds", header_map, response));
Expand Down

0 comments on commit 6069f86

Please sign in to comment.