Skip to content

Commit

Permalink
[xDS] generalize CDS metadata handling
Browse files Browse the repository at this point in the history
  • Loading branch information
markdroth committed Aug 14, 2024
1 parent 4fd48b1 commit 8f581c8
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 106 deletions.
1 change: 1 addition & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5415,6 +5415,7 @@ grpc_cc_library(
"xds/grpc/xds_cluster.h",
],
external_deps = [
"absl/container:flat_hash_map",
"absl/strings",
"absl/types:optional",
"absl/types:variant",
Expand Down
20 changes: 17 additions & 3 deletions src/core/load_balancing/xds/xds_cluster_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ class XdsClusterImplLb final : public LoadBalancingPolicy {
// Current config from the resolver.
RefCountedPtr<XdsClusterImplLbConfig> config_;
std::shared_ptr<const XdsClusterResource> cluster_resource_;
RefCountedStringValue service_telemetry_label_;
RefCountedStringValue namespace_telemetry_label_;
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;

// Current concurrent number of requests.
Expand Down Expand Up @@ -397,10 +399,9 @@ XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
: call_counter_(xds_cluster_impl_lb->call_counter_),
max_concurrent_requests_(
xds_cluster_impl_lb->cluster_resource_->max_concurrent_requests),
service_telemetry_label_(
xds_cluster_impl_lb->cluster_resource_->service_telemetry_label),
service_telemetry_label_(xds_cluster_impl_lb->service_telemetry_label_),
namespace_telemetry_label_(
xds_cluster_impl_lb->cluster_resource_->namespace_telemetry_label),
xds_cluster_impl_lb->namespace_telemetry_label_),
drop_config_(xds_cluster_impl_lb->drop_config_),
drop_stats_(xds_cluster_impl_lb->drop_stats_),
picker_(std::move(picker)) {
Expand Down Expand Up @@ -648,6 +649,19 @@ absl::Status XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
// Update config state, now that we're done comparing old and new fields.
config_ = std::move(new_config);
cluster_resource_ = new_cluster_config.cluster;
auto it2 =
cluster_resource_->metadata.find("com.google.csm.telemetry_labels");
if (it2 != cluster_resource_->metadata.end()) {
auto& json_object = it2->second.object();
auto it3 = json_object.find("service_name");
if (it3 != json_object.end() && it3->second.type() == Json::Type::kString) {
service_telemetry_label_ = RefCountedStringValue(it3->second.string());
}
it3 = json_object.find("service_namespace");
if (it3 != json_object.end() && it3->second.type() == Json::Type::kString) {
namespace_telemetry_label_ = RefCountedStringValue(it3->second.string());
}
}
drop_config_ = endpoint_config->endpoints != nullptr
? endpoint_config->endpoints->drop_config
: nullptr;
Expand Down
14 changes: 7 additions & 7 deletions src/core/xds/grpc/xds_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ std::string XdsClusterResource::ToString() const {
absl::StrCat("max_concurrent_requests=", max_concurrent_requests));
contents.push_back(absl::StrCat("override_host_statuses=",
override_host_statuses.ToString()));
if (!service_telemetry_label.as_string_view().empty()) {
contents.push_back(absl::StrCat("service_name_telemetry_label=",
service_telemetry_label.as_string_view()));
}
if (!namespace_telemetry_label.as_string_view().empty()) {
if (!metadata.empty()) {
std::vector<std::string> metadata_entries;
for (const auto& p : metadata) {
metadata_entries.push_back(
absl::StrCat(p.first, "=", JsonDump(p.second)));
}
contents.push_back(
absl::StrCat("service_namespace_telemetry_label=",
namespace_telemetry_label.as_string_view()));
absl::StrCat("metadata={", absl::StrJoin(metadata_entries, ", "), "}"));
}
return absl::StrCat("{", absl::StrJoin(contents, ", "), "}");
}
Expand Down
7 changes: 3 additions & 4 deletions src/core/xds/grpc/xds_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string>
#include <vector>

#include "absl/container/flat_hash_map.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"

Expand Down Expand Up @@ -86,8 +87,7 @@ struct XdsClusterResource : public XdsResourceType::ResourceData {

XdsHealthStatusSet override_host_statuses;

RefCountedStringValue service_telemetry_label;
RefCountedStringValue namespace_telemetry_label;
absl::flat_hash_map<std::string, Json> metadata;

bool operator==(const XdsClusterResource& other) const {
return type == other.type && lb_policy_config == other.lb_policy_config &&
Expand All @@ -97,8 +97,7 @@ struct XdsClusterResource : public XdsResourceType::ResourceData {
max_concurrent_requests == other.max_concurrent_requests &&
outlier_detection == other.outlier_detection &&
override_host_statuses == other.override_host_statuses &&
service_telemetry_label == other.service_telemetry_label &&
namespace_telemetry_label == other.namespace_telemetry_label;
metadata == other.metadata;
}

std::string ToString() const;
Expand Down
65 changes: 40 additions & 25 deletions src/core/xds/grpc/xds_cluster_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,34 +641,49 @@ absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
cds_update->override_host_statuses.Add(
XdsHealthStatus(XdsHealthStatus::kHealthy));
}
// Record telemetry labels (if any).
// Parse metadata.
const envoy_config_core_v3_Metadata* metadata =
envoy_config_cluster_v3_Cluster_metadata(cluster);
if (metadata != nullptr) {
google_protobuf_Struct* telemetry_labels_struct;
if (envoy_config_core_v3_Metadata_filter_metadata_get(
metadata,
StdStringToUpbString(
absl::string_view("com.google.csm.telemetry_labels")),
&telemetry_labels_struct)) {
size_t iter = kUpb_Map_Begin;
const google_protobuf_Struct_FieldsEntry* fields_entry;
while ((fields_entry = google_protobuf_Struct_fields_next(
telemetry_labels_struct, &iter)) != nullptr) {
// Adds any entry whose value is a string to telemetry_labels.
const google_protobuf_Value* value =
google_protobuf_Struct_FieldsEntry_value(fields_entry);
if (google_protobuf_Value_has_string_value(value)) {
if (UpbStringToAbsl(google_protobuf_Struct_FieldsEntry_key(
fields_entry)) == "service_name") {
cds_update->service_telemetry_label = RefCountedStringValue(
UpbStringToAbsl(google_protobuf_Value_string_value(value)));
} else if (UpbStringToAbsl(google_protobuf_Struct_FieldsEntry_key(
fields_entry)) == "service_namespace") {
cds_update->namespace_telemetry_label = RefCountedStringValue(
UpbStringToAbsl(google_protobuf_Value_string_value(value)));
}
}
// First, try typed_filter_metadata.
size_t iter = kUpb_Map_Begin;
const envoy_config_core_v3_Metadata_TypedFilterMetadataEntry* typed_entry;
while (
(typed_entry = envoy_config_core_v3_Metadata_typed_filter_metadata_next(
metadata, &iter)) != nullptr) {
absl::string_view key = UpbStringToAbsl(
envoy_config_core_v3_Metadata_TypedFilterMetadataEntry_key(
typed_entry));
ValidationErrors::ScopedField field(
&errors, absl::StrCat(".metadata.typed_filter_metadata[", key, "]"));
auto extension = ExtractXdsExtension(
context,
envoy_config_core_v3_Metadata_TypedFilterMetadataEntry_value(
typed_entry),
&errors);
if (!extension.has_value()) continue;
// TODO(roth): If we ever need to support another type here, refactor
// this into a separate registry.
if (extension->type == "extensions.filters.http.gcp_authn.v3.Audience") {
// TODO(roth): In a subsequent PR, add parsing here.
}
}
// Then, try filter_metadata.
iter = kUpb_Map_Begin;
const envoy_config_core_v3_Metadata_FilterMetadataEntry* entry;
while ((entry = envoy_config_core_v3_Metadata_filter_metadata_next(
metadata, &iter)) != nullptr) {
absl::string_view key = UpbStringToAbsl(
envoy_config_core_v3_Metadata_FilterMetadataEntry_key(entry));
auto json = ParseProtobufStructToJson(
context,
envoy_config_core_v3_Metadata_FilterMetadataEntry_value(entry));
if (!json.ok()) {
ValidationErrors::ScopedField field(
&errors, absl::StrCat(".metadata.filter_metadata[", key, "]"));
errors.AddError(json.status().message());
} else if (!cds_update->metadata.contains(key)) {
cds_update->metadata[std::move(key)] = std::move(*json);
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/core/xds/grpc/xds_common_types_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,9 @@ CommonTlsContext CommonTlsContextParse(
}

//
// ExtractXdsExtension
// ParseProtobufStructToJson()
//

namespace {

absl::StatusOr<Json> ParseProtobufStructToJson(
const XdsResourceType::DecodeContext& context,
const google_protobuf_Struct* resource) {
Expand Down Expand Up @@ -405,7 +403,9 @@ absl::StatusOr<Json> ParseProtobufStructToJson(
return std::move(*json);
}

} // namespace
//
// ExtractXdsExtension()
//

absl::optional<XdsExtension> ExtractXdsExtension(
const XdsResourceType::DecodeContext& context,
Expand Down
5 changes: 5 additions & 0 deletions src/core/xds/grpc/xds_common_types_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h"
#include "google/protobuf/any.upb.h"
#include "google/protobuf/duration.upb.h"
#include "google/protobuf/struct.upb.h"
#include "google/protobuf/wrappers.upb.h"

#include "src/core/lib/gprpp/time.h"
Expand All @@ -45,6 +46,10 @@ CommonTlsContext CommonTlsContextParse(
common_tls_context_proto,
ValidationErrors* errors);

absl::StatusOr<Json> ParseProtobufStructToJson(
const XdsResourceType::DecodeContext& context,
const google_protobuf_Struct* resource);

absl::optional<XdsExtension> ExtractXdsExtension(
const XdsResourceType::DecodeContext& context,
const google_protobuf_Any* any, ValidationErrors* errors);
Expand Down
93 changes: 30 additions & 63 deletions test/core/xds/xds_cluster_resource_type_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1645,52 +1645,30 @@ TEST_F(HostOverrideStatusTest, CanExplicitlySetToEmpty) {

using TelemetryLabelTest = XdsClusterTest;

TEST_F(TelemetryLabelTest, ValidServiceLabelsConfig) {
Cluster cluster;
cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata();
auto& label_map =
*filter_map["com.google.csm.telemetry_labels"].mutable_fields();
*label_map["service_name"].mutable_string_value() = "abc";
*label_map["service_namespace"].mutable_string_value() = "xyz";
std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_EQ(resource.service_telemetry_label.as_string_view(), "abc");
EXPECT_EQ(resource.namespace_telemetry_label.as_string_view(), "xyz");
}

TEST_F(TelemetryLabelTest, MissingMetadataField) {
Cluster cluster;
cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_THAT(resource.service_telemetry_label.as_string_view(),
::testing::IsEmpty());
EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(),
::testing::IsEmpty());
MATCHER_P(JsonEq, json_str, "") {
std::string actual = JsonDump(arg);
bool ok = ::testing::ExplainMatchResult(json_str, actual, result_listener);
if (!ok) *result_listener << "Actual: " << actual;
return ok;
}

TEST_F(TelemetryLabelTest, MissingCsmFilterMetadataField) {
TEST_F(TelemetryLabelTest, ValidServiceLabelsConfig) {
Cluster cluster;
cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata();
auto& label_map = *filter_map["some_key"].mutable_fields();
*label_map["some_value"].mutable_string_value() = "abc";
auto& label_map = *filter_map["filter_key"].mutable_fields();
*label_map["string_value"].mutable_string_value() = "abc";
label_map["bool_value"].set_bool_value(true);
label_map["number_value"].set_number_value(3.14);
label_map["null_value"].set_null_value(::google::protobuf::NULL_VALUE);
auto& list_value_values =
*label_map["list_value"].mutable_list_value()->mutable_values();
*list_value_values.Add()->mutable_string_value() = "efg";
list_value_values.Add()->set_number_value(3.14);
auto& struct_value_fields =
*label_map["struct_value"].mutable_struct_value()->mutable_fields();
struct_value_fields["bool_value"].set_bool_value(false);
std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get();
Expand All @@ -1699,31 +1677,22 @@ TEST_F(TelemetryLabelTest, MissingCsmFilterMetadataField) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_THAT(resource.service_telemetry_label.as_string_view(),
::testing::IsEmpty());
EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(),
::testing::IsEmpty());
EXPECT_THAT(resource.metadata,
::testing::ElementsAre(::testing::Pair(
"filter_key", JsonEq("{"
"\"bool_value\":true,"
"\"list_value\":[\"efg\",3.14],"
"\"null_value\":null,"
"\"number_value\":3.14,"
"\"string_value\":\"abc\","
"\"struct_value\":{\"bool_value\":false}"
"}"))));
}

TEST_F(TelemetryLabelTest, IgnoreNonServiceLabelEntries) {
TEST_F(TelemetryLabelTest, MissingMetadataField) {
Cluster cluster;
cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata();
auto& label_map =
*filter_map["com.google.csm.telemetry_labels"].mutable_fields();
label_map["bool_value"].set_bool_value(true);
label_map["number_value"].set_number_value(3.14);
*label_map["string_value"].mutable_string_value() = "abc";
*label_map["service_name"].mutable_string_value() = "service";
label_map["null_value"].set_null_value(::google::protobuf::NULL_VALUE);
auto& list_value_values =
*label_map["list_value"].mutable_list_value()->mutable_values();
*list_value_values.Add()->mutable_string_value() = "efg";
list_value_values.Add()->set_number_value(3.14);
auto& struct_value_fields =
*label_map["struct_value"].mutable_struct_value()->mutable_fields();
struct_value_fields["bool_value"].set_bool_value(false);
std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get();
Expand All @@ -1732,9 +1701,7 @@ TEST_F(TelemetryLabelTest, IgnoreNonServiceLabelEntries) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_THAT(resource.service_telemetry_label.as_string_view(), "service");
EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(),
::testing::IsEmpty());
EXPECT_THAT(resource.metadata, ::testing::ElementsAre());
}

} // namespace
Expand Down

0 comments on commit 8f581c8

Please sign in to comment.