From a2d9eba9188e14d6238e779755b1f2bec73463d6 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Wed, 18 Sep 2024 16:03:15 -0700 Subject: [PATCH] kafka: upgrade to 3.8, add support for more requests (#36166) Commit Message: kafka: upgrade to 3.8, add support for more requests Additional Description: upgrade kafka dependency to 3.8, add necessary parsing code and deserializer to process new stuff present in 3.8 (nullable struct) Risk Level: low Testing: automated suite + manual with [envoy-kafka-tests](https://github.com/adamkotwasinski/envoy-kafka-tests/pull/13) Docs Changes: readme updates due to version bump Release Notes: n/a Platform Specific Features: n/a --------- Signed-off-by: Adam Kotwasinski --- bazel/repository_locations.bzl | 12 +- .../network/source/protocol/generator.py | 69 +++++++----- .../filters/network/source/serialization.h | 104 ++++++++++++++++++ .../network/test/serialization_test.cc | 16 +++ .../network_filters/kafka_broker_filter.rst | 5 +- .../network_filters/kafka_mesh_filter.rst | 2 +- 6 files changed, 172 insertions(+), 36 deletions(-) diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index 3ed519854cf8..6752ec066cd0 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -1333,13 +1333,13 @@ REPOSITORY_LOCATIONS_SPEC = dict( project_name = "Kafka (source)", project_desc = "Open-source distributed event streaming platform", project_url = "https://kafka.apache.org", - version = "3.5.1", - sha256 = "9715589a02148fb21bc80d79f29763dbd371457bedcbbeab3db4f5c7fdd2d29c", + version = "3.8.0", + sha256 = "8761a0c22738201d3049f11f78c8e6c0f201203ba799157e498ef7eb04c259f3", strip_prefix = "kafka-{version}/clients/src/main/resources/common/message", urls = ["https://github.com/apache/kafka/archive/{version}.zip"], use_category = ["dataplane_ext"], extensions = ["envoy.filters.network.kafka_broker", "envoy.filters.network.kafka_mesh"], - release_date = "2023-07-14", + release_date = "2024-07-23", cpe = "cpe:2.3:a:apache:kafka:*", license = "Apache-2.0", license_url = "https://github.com/apache/kafka/blob/{version}/LICENSE", @@ -1363,11 +1363,11 @@ REPOSITORY_LOCATIONS_SPEC = dict( project_name = "Kafka (server binary)", project_desc = "Open-source distributed event streaming platform", project_url = "https://kafka.apache.org", - version = "3.5.1", - sha256 = "f7b74d544023f2c0ec52a179de59975cb64e34ea03650d829328b407b560e4da", + version = "3.8.0", + sha256 = "e0297cc6fdb09ef9d9905751b25d2b629c17528f8629b60561eeff87ce29099c", strip_prefix = "kafka_2.13-{version}", urls = ["https://archive.apache.org/dist/kafka/{version}/kafka_2.13-{version}.tgz"], - release_date = "2023-07-21", + release_date = "2024-07-23", use_category = ["test_only"], ), proxy_wasm_cpp_sdk = dict( diff --git a/contrib/kafka/filters/network/source/protocol/generator.py b/contrib/kafka/filters/network/source/protocol/generator.py index 31762b17574b..656856219d65 100755 --- a/contrib/kafka/filters/network/source/protocol/generator.py +++ b/contrib/kafka/filters/network/source/protocol/generator.py @@ -153,9 +153,8 @@ def parse_messages(self, input_files): amended = re.sub(r'-2147483648', 'INT32_MIN', without_empty_newlines) message_spec = json.loads(amended) api_key = message_spec['apiKey'] - # (adam.kotwasinski) Higher API keys in the future versions of Kafka need - # some more changes to parse. - if api_key < 68 or api_key == 69: + # (adam.kotwasinski) Telemetry is not supported for now. + if api_key not in [71, 72]: message = self.parse_top_level_element(message_spec) messages.append(message) except Exception as e: @@ -224,8 +223,9 @@ def parse_complex_type(self, type_name, field_spec, versions): fields.append(child) # Some structures share the same name, use request/response as prefix. - if cpp_name in ['EntityData', 'EntryData', 'PartitionData', 'PartitionSnapshot', - 'SnapshotId', 'TopicData', 'TopicPartitions', 'TopicSnapshot']: + if cpp_name in ['Cursor', 'DirectoryData', 'EntityData', 'EntryData', 'PartitionData', + 'PartitionSnapshot', 'SnapshotId', 'TopicData', 'TopicPartitions', + 'TopicSnapshot']: cpp_name = self.type.capitalize() + type_name # Some of the types repeat multiple times (e.g. AlterableConfig). @@ -370,9 +370,9 @@ def example_value(self): class FieldSpec: """ - Represents a field present in a structure (request, or child structure thereof). - Contains name, type, and versions when it is used (nullable or not). - """ + Represents a field present in a structure (request, or child structure thereof). + Contains name, type, and versions when it is used (nullable or not). + """ def __init__(self, name, type, version_usage, version_usage_as_nullable): import re @@ -387,10 +387,10 @@ def is_nullable(self): def is_nullable_in_version(self, version): """ - Whether the field is nullable in given version. - Fields can be non-nullable in earlier versions. - See https://github.com/apache/kafka/tree/2.2.0-rc0/clients/src/main/resources/common/message#nullable-fields - """ + Whether the field is nullable in given version. + Fields can be non-nullable in earlier versions. + See https://github.com/apache/kafka/tree/3.8.0/clients/src/main/resources/common/message#nullable-fields + """ return version in self.version_usage_as_nullable def used_in_version(self, version): @@ -428,13 +428,21 @@ def example_value_for_test(self, version): def deserializer_name_in_version(self, version, compact): if self.is_nullable_in_version(version): - return 'Nullable%s' % self.type.deserializer_name_in_version(version, compact) + underlying_deserializer = self.type.deserializer_name_in_version(version, compact) + # Handles KAFKA-14425 - structs (complex types) can now be nullable. + if isinstance(self.type, Complex): + return 'NullableStructDeserializer<%s>' % underlying_deserializer + else: + return 'Nullable%s' % underlying_deserializer else: return self.type.deserializer_name_in_version(version, compact) def is_printable(self): return self.type.is_printable() + def __str__(self): + return '%s(%s)' % (self.name, self.type) + class TypeSpecification: @@ -471,10 +479,10 @@ def is_printable(self): class Array(TypeSpecification): """ - Represents array complex type. - To use instance of this type, it is necessary to declare structures required by self.underlying - (e.g. to use Array, we need to have `struct Foo {...}`). - """ + Represents array complex type. + To use instance of this type, it is necessary to declare structures required by self.underlying + (e.g. to use Array, we need to have `struct Foo {...}`). + """ def __init__(self, underlying): self.underlying = underlying @@ -505,6 +513,9 @@ def example_value_for_test(self, version): def is_printable(self): return self.underlying.is_printable() + def __str__(self): + return self.name + class Primitive(TypeSpecification): """ @@ -643,6 +654,9 @@ def example_value_for_test(self, version): def is_printable(self): return self.name not in ['Bytes'] + def __str__(self): + return self.name + class FieldSerializationSpec(): @@ -679,9 +693,9 @@ def register_flexible_versions(self, flexible_versions): def compute_declaration_chain(self): """ - Computes all dependencies, what means all non-primitive types used by this type. - They need to be declared before this struct is declared. - """ + Computes all dependencies, what means all non-primitive types used by this type. + They need to be declared before this struct is declared. + """ result = [] for field in self.fields: field_dependencies = field.type.compute_declaration_chain() @@ -700,10 +714,10 @@ def get_extra(self, key): def compute_constructors(self): """ - Field lists for different versions may not differ (as Kafka can bump version without any - changes). But constructors need to be unique, so we need to remove duplicates if the signatures - match. - """ + Field lists for different versions may not differ (as Kafka can bump version without any + changes). But constructors need to be unique, so we need to remove duplicates + if the signatures match. + """ signature_to_constructor = {} for field_list in self.compute_field_lists(): signature = field_list.constructor_signature() @@ -724,8 +738,8 @@ def compute_constructors(self): def compute_field_lists(self): """ - Return field lists representing each of structure versions. - """ + Return field lists representing each of structure versions. + """ field_lists = [] for version in self.versions: field_list = FieldList(version, version in self.flexible_versions, self.fields) @@ -772,6 +786,9 @@ def example_value_for_test(self, version): def is_printable(self): return True + def __str__(self): + return self.name + class RenderingHelper: """ diff --git a/contrib/kafka/filters/network/source/serialization.h b/contrib/kafka/filters/network/source/serialization.h index 902fbd9c00f4..f2ea6a7545bf 100644 --- a/contrib/kafka/filters/network/source/serialization.h +++ b/contrib/kafka/filters/network/source/serialization.h @@ -918,6 +918,72 @@ class NullableCompactArrayDeserializer bool ready_{false}; }; +/** + * Nullable objects are sent as single byte and following data. + * Reference: https://issues.apache.org/jira/browse/KAFKA-14425 + */ +template +class NullableStructDeserializer + : public Deserializer> { +public: + using ResponseType = absl::optional; + + uint32_t feed(absl::string_view& data) override { + + if (data.empty()) { + return 0; + } + + uint32_t bytes_read = 0; + + if (!marker_consumed_) { + // Read marker byte from input. + int8_t marker; + safeMemcpy(&marker, data.data()); + data = {data.data() + 1, data.size() - 1}; + bytes_read += 1; + marker_consumed_ = true; + + if (marker >= 0) { + data_buf_ = absl::make_optional(DeserializerType()); + } else { + return bytes_read; + } + } + + if (data_buf_) { + bytes_read += data_buf_->feed(data); + } + + return bytes_read; + } + + bool ready() const override { + if (marker_consumed_) { + if (data_buf_) { + return data_buf_->ready(); + } else { + return true; // It's an empty optional. + } + } else { + return false; + } + } + + ResponseType get() const override { + if (data_buf_) { + const typename ResponseType::value_type deserialized_form = data_buf_->get(); + return absl::make_optional(deserialized_form); + } else { + return absl::nullopt; + } + } + +private: + bool marker_consumed_{false}; + absl::optional data_buf_; // Present if marker was consumed and was 0 or more. +}; + /** * Kafka UUID is basically two longs, so we are going to keep model them the same way. * Reference: @@ -996,6 +1062,12 @@ class EncodingContext { */ template uint32_t computeSize(const NullableArray& arg) const; + /** + * Compute size of given nullable object, if it were to be encoded. + * @return serialized size of argument. + */ + template uint32_t computeSize(const absl::optional& arg) const; + /** * Compute size of given reference, if it were to be compactly encoded. * @return serialized size of argument. @@ -1032,6 +1104,12 @@ class EncodingContext { */ template uint32_t encode(const NullableArray& arg, Buffer::Instance& dst); + /** + * Encode given nullable object in a buffer. + * @return bytes written + */ + template uint32_t encode(const absl::optional& arg, Buffer::Instance& dst); + /** * Compactly encode given reference in a buffer. * @return bytes written. @@ -1135,6 +1213,15 @@ inline uint32_t EncodingContext::computeSize(const NullableArray& arg) const return arg ? computeSize(*arg) : sizeof(int32_t); } +/** + * Template overload for nullable T. + * The size of nullable object is 1 (for market byte) and the size of real object (if any). + */ +template +inline uint32_t EncodingContext::computeSize(const absl::optional& arg) const { + return 1 + (arg ? computeSize(*arg) : 0); +} + /** * Template overload for Uuid. */ @@ -1388,6 +1475,23 @@ uint32_t EncodingContext::encode(const NullableArray& arg, Buffer::Instance& } } +/** + * Encode nullable object as marker byte (1 if present, -1 otherwise), then if object is present, + * have it serialise itself. + */ +template +uint32_t EncodingContext::encode(const absl::optional& arg, Buffer::Instance& dst) { + if (arg) { + const int8_t marker = 1; + encode(marker, dst); + const uint32_t written = encode(*arg, dst); + return 1 + written; + } else { + const int8_t marker = -1; + return encode(marker, dst); + } +} + /** * Template overload for Uuid. */ diff --git a/contrib/kafka/filters/network/test/serialization_test.cc b/contrib/kafka/filters/network/test/serialization_test.cc index 59aa0e567b96..bbb80c3bd8f7 100644 --- a/contrib/kafka/filters/network/test/serialization_test.cc +++ b/contrib/kafka/filters/network/test/serialization_test.cc @@ -41,6 +41,8 @@ TEST_EmptyDeserializerShouldNotBeReady(BytesDeserializer); TEST_EmptyDeserializerShouldNotBeReady(CompactBytesDeserializer); TEST_EmptyDeserializerShouldNotBeReady(NullableBytesDeserializer); TEST_EmptyDeserializerShouldNotBeReady(NullableCompactBytesDeserializer); +using ExampleNullableStructDeserializer = NullableStructDeserializer; +TEST_EmptyDeserializerShouldNotBeReady(ExampleNullableStructDeserializer); TEST_EmptyDeserializerShouldNotBeReady(UuidDeserializer); TEST(ArrayDeserializer, EmptyBufferShouldNotBeReady) { @@ -544,6 +546,20 @@ TEST(NullableCompactArrayDeserializer, ShouldConsumeCorrectAmountOfDataForLargeI NullableCompactArrayDeserializer>(value); } +// Nullable struct. + +using ExampleNSD = NullableStructDeserializer; + +TEST(NullableStructDeserializer, ShouldConsumeCorrectAmountOfData) { + const ExampleNSD::ResponseType value = {42}; + serializeThenDeserializeAndCheckEquality(value); +} + +TEST(NullableStructDeserializer, ShouldConsumeNullStruct) { + const ExampleNSD::ResponseType value = absl::nullopt; + serializeThenDeserializeAndCheckEquality(value); +} + // UUID. TEST(UuidDeserializer, ShouldDeserialize) { diff --git a/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst b/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst index b8d0302796d9..2025137c170e 100644 --- a/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst +++ b/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst @@ -5,9 +5,8 @@ Kafka Broker filter The Apache Kafka broker filter decodes the client protocol for `Apache Kafka `_, both the requests and responses in the payload. -The message versions in `Kafka 3.5.1 `_ -are supported (apart from ConsumerGroupHeartbeat - what means consumers configured with -``group.protocol=consumer``). +The message versions in `Kafka 3.8.0 `_ +are supported. By default the filter attempts not to influence the communication between client and brokers, so the messages that could not be decoded (due to Kafka client or broker running a newer version than diff --git a/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst b/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst index 472947348971..6da435ad8379 100644 --- a/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst +++ b/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst @@ -12,7 +12,7 @@ clients. The requests received by this filter instance can be forwarded to one of multiple clusters, depending on the configured forwarding rules. -Corresponding message versions from Kafka 3.5.1 are supported. +Corresponding message versions from Kafka 3.8.0 are supported. * This filter should be configured with the type URL ``type.googleapis.com/envoy.extensions.filters.network.kafka_mesh.v3alpha.KafkaMesh``. * :ref:`v3 API reference `