Skip to content

Commit

Permalink
Allow producing messages without specifying a Key (#30)
Browse files Browse the repository at this point in the history
* Allow producing messages without specifying a Key

According to [1], Keys are optional. With this change, by not setting a
key for a message it will be considered null.

Example:
~~~
        let messages = [
            {
                value: JSON.stringify({
                    firstname: "firstname-" + index,
                    lastname: "lastname-" + index,
                }),
                // Only value is set. The key field is not set.
            },
        ];
        let error = produceWithConfiguration(
            producer,
            messages,
            configuration,
            null, //!< Null for key schema, not used
            valueSchema
        );
~~~

[1]
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets

* Adding test scripts

* Explicitly set 'nil' for .Key in message if key is not set

* Revert "Explicitly set 'nil' for .Key in message if key is not set"

This reverts commit fc959d4.

* scripts: show consumer messages as debugs

If --verbose is provided, messages received by the consumer as displayed
to make clear that only 'value' is present.

Example from test_avro_with_schema_registry_no_leys.js:
DEBU[0000] Message:
{"value":{"firstname":"firstname-2","lastname":"lastname-2"}}
source=console
  • Loading branch information
eduardowitter authored Dec 9, 2021
1 parent f2f51c7 commit c7bd315
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 13 deletions.
34 changes: 21 additions & 13 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,30 +90,38 @@ func ProduceInternal(

kafkaMessages := make([]kafkago.Message, len(messages))
for i, message := range messages {
key := []byte(message["key"])
if keySchema != "" {
key = ToAvro(message["key"], keySchema)

kafkaMessages[i] = kafkago.Message{}

// If a key was provided, add it to the message. Keys are optional.
if _, has_key := message["key"]; has_key {
key := []byte(message["key"])
if keySchema != "" {
key = ToAvro(message["key"], keySchema)
}
keyData, err := addMagicByteAndSchemaIdPrefix(configuration, key, writer.Stats().Topic, "key", keySchema)
if err != nil {
ReportError(err, "Creation of key bytes failed.")
return err
}

kafkaMessages[i].Key = keyData
}

// Then add then message
value := []byte(message["value"])
if valueSchema != "" {
value = ToAvro(message["value"], valueSchema)
}

keyData, err := addMagicByteAndSchemaIdPrefix(configuration, key, writer.Stats().Topic, "key", keySchema)
if err != nil {
ReportError(err, "Creation of key bytes failed.")
return err
}
valueData, err := addMagicByteAndSchemaIdPrefix(configuration, value, writer.Stats().Topic, "value", valueSchema)
if err != nil {
ReportError(err, "Creation of key bytes failed.")
ReportError(err, "Creation of message bytes failed.")
return err
}
kafkaMessages[i] = kafkago.Message{
Key: keyData,
Value: valueData,
}

kafkaMessages[i].Value = valueData

}

err = writer.WriteMessages(ctx, kafkaMessages...)
Expand Down
97 changes: 97 additions & 0 deletions scripts/test_avro_no_key.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
This is a k6 test script that imports the xk6-kafka and
tests Kafka by sending 200 Avro messages per iteration
without any associated key.
*/

import { check } from "k6";
import { writer, produce, reader, consume, createTopic } from "k6/x/kafka"; // import kafka extension

const bootstrapServers = ["localhost:9092"];
const kafkaTopic = "xk6_kafka_avro_topic";

const producer = writer(bootstrapServers, kafkaTopic);
const consumer = reader(bootstrapServers, kafkaTopic);

const valueSchema = JSON.stringify({
type: "record",
name: "Value",
namespace: "dev.mostafa.xk6.kafka",
fields: [
{
name: "name",
type: "string",
},
{
name: "version",
type: "string",
},
{
name: "author",
type: "string",
},
{
name: "description",
type: "string",
},
{
name: "url",
type: "string",
},
{
name: "index",
type: "int",
},
],
});

createTopic(bootstrapServers[0], kafkaTopic);

export default function () {
for (let index = 0; index < 100; index++) {
let messages = [
{
value: JSON.stringify({
name: "xk6-kafka",
version: "0.2.1",
author: "Mostafa Moradian",
description:
"k6 extension to load test Apache Kafka with support for Avro messages",
url: "https://mostafa.dev",
index: index,
}),
},
{
value: JSON.stringify({
name: "xk6-kafka",
version: "0.2.1",
author: "Mostafa Moradian",
description:
"k6 extension to load test Apache Kafka with support for Avro messages",
url: "https://mostafa.dev",
index: index,
}),
},
];
let error = produce(producer, messages, null, valueSchema);
check(error, {
"is sent": (err) => err == undefined,
});
}

// Read 10 messages only
let rx_messages = consume(consumer, 10, null, valueSchema);
check(rx_messages, {
"10 messages returned": (msgs) => msgs.length == 10,
});

for (let index = 0; index < rx_messages.length; index++) {
console.debug('Received Message: ' + JSON.stringify(rx_messages[index]));
}
}

export function teardown(data) {
producer.close();
consumer.close();
}
95 changes: 95 additions & 0 deletions scripts/test_avro_with_schema_registry_no_key.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
This is a k6 test script that imports the xk6-kafka and
tests Kafka with a 100 Avro messages per iteration.
*/

import { check } from "k6";
import {
writer,
reader,
consumeWithConfiguration,
produceWithConfiguration,
createTopic,
} from "k6/x/kafka"; // import kafka extension

const bootstrapServers = ["localhost:9092"];
const topic = "com.example.person";

const producer = writer(bootstrapServers, topic, null);
const consumer = reader(bootstrapServers, topic, null, "", null, null);

const valueSchema = `{
"name": "ValueSchema",
"type": "record",
"namespace": "com.example",
"fields": [
{
"name": "firstname",
"type": "string"
},
{
"name": "lastname",
"type": "string"
}
]
}`;

var configuration = JSON.stringify({
consumer: {
keyDeserializer: "",
valueDeserializer:
"io.confluent.kafka.serializers.KafkaAvroDeserializer",
},
producer: {
keySerializer: "",
valueSerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer",
},
schemaRegistry: {
url: "http://localhost:8081",
},
});

createTopic(bootstrapServers[0], topic);

export default function () {
for (let index = 0; index < 100; index++) {
let messages = [
{
value: JSON.stringify({
firstname: "firstname-" + index,
lastname: "lastname-" + index,
}),
},
];
let error = produceWithConfiguration(
producer,
messages,
configuration,
null,
valueSchema
);
check(error, {
"is sent": (err) => err == undefined,
});
}

let rx_messages = consumeWithConfiguration(
consumer,
20,
configuration,
null,
valueSchema
);
check(rx_messages, {
"20 message returned": (msgs) => msgs.length == 20,
});

for (let index = 0; index < rx_messages.length; index++) {
console.debug('Received Message: ' + JSON.stringify(rx_messages[index]));
}
}

export function teardown(data) {
producer.close();
consumer.close();
}

0 comments on commit c7bd315

Please sign in to comment.