Skip to content

Commit

Permalink
kafka: upgrade to 3.8, add support for more requests (#36166)
Browse files Browse the repository at this point in the history
Commit Message: kafka: upgrade to 3.8, add support for more requests
Additional Description: upgrade kafka dependency to 3.8, add necessary
parsing code and deserializer to process new stuff present in 3.8
(nullable struct)
Risk Level: low
Testing: automated suite + manual with
[envoy-kafka-tests](adamkotwasinski/envoy-kafka-tests#13)
Docs Changes: readme updates due to version bump
Release Notes: n/a
Platform Specific Features: n/a

---------

Signed-off-by: Adam Kotwasinski <[email protected]>
  • Loading branch information
adamkotwasinski authored Sep 18, 2024
1 parent c6aa25c commit a2d9eba
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 36 deletions.
12 changes: 6 additions & 6 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1333,13 +1333,13 @@ REPOSITORY_LOCATIONS_SPEC = dict(
project_name = "Kafka (source)",
project_desc = "Open-source distributed event streaming platform",
project_url = "https://kafka.apache.org",
version = "3.5.1",
sha256 = "9715589a02148fb21bc80d79f29763dbd371457bedcbbeab3db4f5c7fdd2d29c",
version = "3.8.0",
sha256 = "8761a0c22738201d3049f11f78c8e6c0f201203ba799157e498ef7eb04c259f3",
strip_prefix = "kafka-{version}/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/{version}.zip"],
use_category = ["dataplane_ext"],
extensions = ["envoy.filters.network.kafka_broker", "envoy.filters.network.kafka_mesh"],
release_date = "2023-07-14",
release_date = "2024-07-23",
cpe = "cpe:2.3:a:apache:kafka:*",
license = "Apache-2.0",
license_url = "https://github.com/apache/kafka/blob/{version}/LICENSE",
Expand All @@ -1363,11 +1363,11 @@ REPOSITORY_LOCATIONS_SPEC = dict(
project_name = "Kafka (server binary)",
project_desc = "Open-source distributed event streaming platform",
project_url = "https://kafka.apache.org",
version = "3.5.1",
sha256 = "f7b74d544023f2c0ec52a179de59975cb64e34ea03650d829328b407b560e4da",
version = "3.8.0",
sha256 = "e0297cc6fdb09ef9d9905751b25d2b629c17528f8629b60561eeff87ce29099c",
strip_prefix = "kafka_2.13-{version}",
urls = ["https://archive.apache.org/dist/kafka/{version}/kafka_2.13-{version}.tgz"],
release_date = "2023-07-21",
release_date = "2024-07-23",
use_category = ["test_only"],
),
proxy_wasm_cpp_sdk = dict(
Expand Down
69 changes: 43 additions & 26 deletions contrib/kafka/filters/network/source/protocol/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,8 @@ def parse_messages(self, input_files):
amended = re.sub(r'-2147483648', 'INT32_MIN', without_empty_newlines)
message_spec = json.loads(amended)
api_key = message_spec['apiKey']
# (adam.kotwasinski) Higher API keys in the future versions of Kafka need
# some more changes to parse.
if api_key < 68 or api_key == 69:
# (adam.kotwasinski) Telemetry is not supported for now.
if api_key not in [71, 72]:
message = self.parse_top_level_element(message_spec)
messages.append(message)
except Exception as e:
Expand Down Expand Up @@ -224,8 +223,9 @@ def parse_complex_type(self, type_name, field_spec, versions):
fields.append(child)

# Some structures share the same name, use request/response as prefix.
if cpp_name in ['EntityData', 'EntryData', 'PartitionData', 'PartitionSnapshot',
'SnapshotId', 'TopicData', 'TopicPartitions', 'TopicSnapshot']:
if cpp_name in ['Cursor', 'DirectoryData', 'EntityData', 'EntryData', 'PartitionData',
'PartitionSnapshot', 'SnapshotId', 'TopicData', 'TopicPartitions',
'TopicSnapshot']:
cpp_name = self.type.capitalize() + type_name

# Some of the types repeat multiple times (e.g. AlterableConfig).
Expand Down Expand Up @@ -370,9 +370,9 @@ def example_value(self):

class FieldSpec:
"""
Represents a field present in a structure (request, or child structure thereof).
Contains name, type, and versions when it is used (nullable or not).
"""
Represents a field present in a structure (request, or child structure thereof).
Contains name, type, and versions when it is used (nullable or not).
"""

def __init__(self, name, type, version_usage, version_usage_as_nullable):
import re
Expand All @@ -387,10 +387,10 @@ def is_nullable(self):

def is_nullable_in_version(self, version):
"""
Whether the field is nullable in given version.
Fields can be non-nullable in earlier versions.
See https://github.com/apache/kafka/tree/2.2.0-rc0/clients/src/main/resources/common/message#nullable-fields
"""
Whether the field is nullable in given version.
Fields can be non-nullable in earlier versions.
See https://github.com/apache/kafka/tree/3.8.0/clients/src/main/resources/common/message#nullable-fields
"""
return version in self.version_usage_as_nullable

def used_in_version(self, version):
Expand Down Expand Up @@ -428,13 +428,21 @@ def example_value_for_test(self, version):

def deserializer_name_in_version(self, version, compact):
if self.is_nullable_in_version(version):
return 'Nullable%s' % self.type.deserializer_name_in_version(version, compact)
underlying_deserializer = self.type.deserializer_name_in_version(version, compact)
# Handles KAFKA-14425 - structs (complex types) can now be nullable.
if isinstance(self.type, Complex):
return 'NullableStructDeserializer<%s>' % underlying_deserializer
else:
return 'Nullable%s' % underlying_deserializer
else:
return self.type.deserializer_name_in_version(version, compact)

def is_printable(self):
return self.type.is_printable()

def __str__(self):
return '%s(%s)' % (self.name, self.type)


class TypeSpecification:

Expand Down Expand Up @@ -471,10 +479,10 @@ def is_printable(self):

class Array(TypeSpecification):
"""
Represents array complex type.
To use instance of this type, it is necessary to declare structures required by self.underlying
(e.g. to use Array<Foo>, we need to have `struct Foo {...}`).
"""
Represents array complex type.
To use instance of this type, it is necessary to declare structures required by self.underlying
(e.g. to use Array<Foo>, we need to have `struct Foo {...}`).
"""

def __init__(self, underlying):
self.underlying = underlying
Expand Down Expand Up @@ -505,6 +513,9 @@ def example_value_for_test(self, version):
def is_printable(self):
return self.underlying.is_printable()

def __str__(self):
return self.name


class Primitive(TypeSpecification):
"""
Expand Down Expand Up @@ -643,6 +654,9 @@ def example_value_for_test(self, version):
def is_printable(self):
return self.name not in ['Bytes']

def __str__(self):
return self.name


class FieldSerializationSpec():

Expand Down Expand Up @@ -679,9 +693,9 @@ def register_flexible_versions(self, flexible_versions):

def compute_declaration_chain(self):
"""
Computes all dependencies, what means all non-primitive types used by this type.
They need to be declared before this struct is declared.
"""
Computes all dependencies, what means all non-primitive types used by this type.
They need to be declared before this struct is declared.
"""
result = []
for field in self.fields:
field_dependencies = field.type.compute_declaration_chain()
Expand All @@ -700,10 +714,10 @@ def get_extra(self, key):

def compute_constructors(self):
"""
Field lists for different versions may not differ (as Kafka can bump version without any
changes). But constructors need to be unique, so we need to remove duplicates if the signatures
match.
"""
Field lists for different versions may not differ (as Kafka can bump version without any
changes). But constructors need to be unique, so we need to remove duplicates
if the signatures match.
"""
signature_to_constructor = {}
for field_list in self.compute_field_lists():
signature = field_list.constructor_signature()
Expand All @@ -724,8 +738,8 @@ def compute_constructors(self):

def compute_field_lists(self):
"""
Return field lists representing each of structure versions.
"""
Return field lists representing each of structure versions.
"""
field_lists = []
for version in self.versions:
field_list = FieldList(version, version in self.flexible_versions, self.fields)
Expand Down Expand Up @@ -772,6 +786,9 @@ def example_value_for_test(self, version):
def is_printable(self):
return True

def __str__(self):
return self.name


class RenderingHelper:
"""
Expand Down
104 changes: 104 additions & 0 deletions contrib/kafka/filters/network/source/serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,72 @@ class NullableCompactArrayDeserializer
bool ready_{false};
};

/**
* Nullable objects are sent as single byte and following data.
* Reference: https://issues.apache.org/jira/browse/KAFKA-14425
*/
template <typename DeserializerType>
class NullableStructDeserializer
: public Deserializer<absl::optional<typename DeserializerType::result_type>> {
public:
using ResponseType = absl::optional<typename DeserializerType::result_type>;

uint32_t feed(absl::string_view& data) override {

if (data.empty()) {
return 0;
}

uint32_t bytes_read = 0;

if (!marker_consumed_) {
// Read marker byte from input.
int8_t marker;
safeMemcpy(&marker, data.data());
data = {data.data() + 1, data.size() - 1};
bytes_read += 1;
marker_consumed_ = true;

if (marker >= 0) {
data_buf_ = absl::make_optional(DeserializerType());
} else {
return bytes_read;
}
}

if (data_buf_) {
bytes_read += data_buf_->feed(data);
}

return bytes_read;
}

bool ready() const override {
if (marker_consumed_) {
if (data_buf_) {
return data_buf_->ready();
} else {
return true; // It's an empty optional.
}
} else {
return false;
}
}

ResponseType get() const override {
if (data_buf_) {
const typename ResponseType::value_type deserialized_form = data_buf_->get();
return absl::make_optional(deserialized_form);
} else {
return absl::nullopt;
}
}

private:
bool marker_consumed_{false};
absl::optional<DeserializerType> data_buf_; // Present if marker was consumed and was 0 or more.
};

/**
* Kafka UUID is basically two longs, so we are going to keep model them the same way.
* Reference:
Expand Down Expand Up @@ -996,6 +1062,12 @@ class EncodingContext {
*/
template <typename T> uint32_t computeSize(const NullableArray<T>& arg) const;

/**
* Compute size of given nullable object, if it were to be encoded.
* @return serialized size of argument.
*/
template <typename T> uint32_t computeSize(const absl::optional<T>& arg) const;

/**
* Compute size of given reference, if it were to be compactly encoded.
* @return serialized size of argument.
Expand Down Expand Up @@ -1032,6 +1104,12 @@ class EncodingContext {
*/
template <typename T> uint32_t encode(const NullableArray<T>& arg, Buffer::Instance& dst);

/**
* Encode given nullable object in a buffer.
* @return bytes written
*/
template <typename T> uint32_t encode(const absl::optional<T>& arg, Buffer::Instance& dst);

/**
* Compactly encode given reference in a buffer.
* @return bytes written.
Expand Down Expand Up @@ -1135,6 +1213,15 @@ inline uint32_t EncodingContext::computeSize(const NullableArray<T>& arg) const
return arg ? computeSize(*arg) : sizeof(int32_t);
}

/**
* Template overload for nullable T.
* The size of nullable object is 1 (for market byte) and the size of real object (if any).
*/
template <typename T>
inline uint32_t EncodingContext::computeSize(const absl::optional<T>& arg) const {
return 1 + (arg ? computeSize(*arg) : 0);
}

/**
* Template overload for Uuid.
*/
Expand Down Expand Up @@ -1388,6 +1475,23 @@ uint32_t EncodingContext::encode(const NullableArray<T>& arg, Buffer::Instance&
}
}

/**
* Encode nullable object as marker byte (1 if present, -1 otherwise), then if object is present,
* have it serialise itself.
*/
template <typename T>
uint32_t EncodingContext::encode(const absl::optional<T>& arg, Buffer::Instance& dst) {
if (arg) {
const int8_t marker = 1;
encode(marker, dst);
const uint32_t written = encode(*arg, dst);
return 1 + written;
} else {
const int8_t marker = -1;
return encode(marker, dst);
}
}

/**
* Template overload for Uuid.
*/
Expand Down
16 changes: 16 additions & 0 deletions contrib/kafka/filters/network/test/serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ TEST_EmptyDeserializerShouldNotBeReady(BytesDeserializer);
TEST_EmptyDeserializerShouldNotBeReady(CompactBytesDeserializer);
TEST_EmptyDeserializerShouldNotBeReady(NullableBytesDeserializer);
TEST_EmptyDeserializerShouldNotBeReady(NullableCompactBytesDeserializer);
using ExampleNullableStructDeserializer = NullableStructDeserializer<Int8Deserializer>;
TEST_EmptyDeserializerShouldNotBeReady(ExampleNullableStructDeserializer);
TEST_EmptyDeserializerShouldNotBeReady(UuidDeserializer);

TEST(ArrayDeserializer, EmptyBufferShouldNotBeReady) {
Expand Down Expand Up @@ -544,6 +546,20 @@ TEST(NullableCompactArrayDeserializer, ShouldConsumeCorrectAmountOfDataForLargeI
NullableCompactArrayDeserializer<Int32Deserializer>>(value);
}

// Nullable struct.

using ExampleNSD = NullableStructDeserializer<Int32Deserializer>;

TEST(NullableStructDeserializer, ShouldConsumeCorrectAmountOfData) {
const ExampleNSD::ResponseType value = {42};
serializeThenDeserializeAndCheckEquality<ExampleNSD>(value);
}

TEST(NullableStructDeserializer, ShouldConsumeNullStruct) {
const ExampleNSD::ResponseType value = absl::nullopt;
serializeThenDeserializeAndCheckEquality<ExampleNSD>(value);
}

// UUID.

TEST(UuidDeserializer, ShouldDeserialize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ Kafka Broker filter

The Apache Kafka broker filter decodes the client protocol for
`Apache Kafka <https://kafka.apache.org/>`_, both the requests and responses in the payload.
The message versions in `Kafka 3.5.1 <http://kafka.apache.org/35/protocol.html#protocol_api_keys>`_
are supported (apart from ConsumerGroupHeartbeat - what means consumers configured with
``group.protocol=consumer``).
The message versions in `Kafka 3.8.0 <http://kafka.apache.org/38/protocol.html#protocol_api_keys>`_
are supported.

By default the filter attempts not to influence the communication between client and brokers, so
the messages that could not be decoded (due to Kafka client or broker running a newer version than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ clients.
The requests received by this filter instance can be forwarded to one of multiple clusters,
depending on the configured forwarding rules.

Corresponding message versions from Kafka 3.5.1 are supported.
Corresponding message versions from Kafka 3.8.0 are supported.

* This filter should be configured with the type URL ``type.googleapis.com/envoy.extensions.filters.network.kafka_mesh.v3alpha.KafkaMesh``.
* :ref:`v3 API reference <envoy_v3_api_msg_extensions.filters.network.kafka_mesh.v3alpha.KafkaMesh>`
Expand Down

0 comments on commit a2d9eba

Please sign in to comment.