diff --git a/scripts/test_json.js b/scripts/test_json.js index 57d376a..3ceae52 100644 --- a/scripts/test_json.js +++ b/scripts/test_json.js @@ -42,27 +42,24 @@ const schemaRegistry = new SchemaRegistry(); if (__VU == 0) { connection.createTopic({ topic: topic, - configEntries: [ - { - configName: "compression.type", - configValue: CODEC_SNAPPY, - }, - ], + configEntries: [{ + configName: "compression.type", + configValue: CODEC_SNAPPY, + }, ], }); } export const options = { thresholds: { // Base thresholds to see if the writer or reader is working - "kafka.writer.error.count": ["count == 0"], - "kafka.reader.error.count": ["count == 0"], + "kafka_writer_error_count": ["count == 0"], + "kafka_reader_error_count": ["count == 0"], }, }; -export default function () { +export default function() { for (let index = 0; index < 100; index++) { - let messages = [ - { + let messages = [{ // The data type of the key is JSON key: schemaRegistry.serialize({ data: { @@ -76,8 +73,7 @@ export default function () { name: "xk6-kafka", version: "0.9.0", author: "Mostafa Moradian", - description: - "k6 extension to load test Apache Kafka with support for Avro messages", + description: "k6 extension to load test Apache Kafka with support for Avro messages", index: index, }, schemaType: SCHEMA_TYPE_JSON, @@ -101,8 +97,7 @@ export default function () { name: "xk6-kafka", version: "0.9.0", author: "Mostafa Moradian", - description: - "k6 extension to load test Apache Kafka with support for Avro messages", + description: "k6 extension to load test Apache Kafka with support for Avro messages", index: index, }, schemaType: SCHEMA_TYPE_JSON, @@ -127,15 +122,15 @@ export default function () { "Topic equals to xk6_kafka_json_topic": (msg) => msg["topic"] == topic, "Key contains key/value and is JSON": (msg) => schemaRegistry - .deserialize({ data: msg.key, schemaType: SCHEMA_TYPE_JSON }) - .correlationId.startsWith("test-id-"), + .deserialize({ data: msg.key, schemaType: SCHEMA_TYPE_JSON }) + .correlationId.startsWith("test-id-"), "Value contains key/value and is JSON": (msg) => typeof schemaRegistry.deserialize({ data: msg.value, schemaType: SCHEMA_TYPE_JSON, }) == "object" && schemaRegistry.deserialize({ data: msg.value, schemaType: SCHEMA_TYPE_JSON }).name == - "xk6-kafka", + "xk6-kafka", "Header equals {'mykey': 'myvalue'}": (msg) => "mykey" in msg.headers && String.fromCharCode(...msg.headers["mykey"]) == "myvalue", "Time is past": (msg) => new Date(msg["time"]) < new Date(), @@ -153,4 +148,4 @@ export function teardown(data) { writer.close(); reader.close(); connection.close(); -} +} \ No newline at end of file diff --git a/scripts/test_string.js b/scripts/test_string.js index 46e82d8..ef8553e 100644 --- a/scripts/test_string.js +++ b/scripts/test_string.js @@ -36,15 +36,14 @@ if (__VU == 0) { export const options = { thresholds: { // Base thresholds to see if the writer or reader is working - "kafka.writer.error.count": ["count == 0"], - "kafka.reader.error.count": ["count == 0"], + "kafka_writer_error_count": ["count == 0"], + "kafka_reader_error_count": ["count == 0"], }, }; -export default function () { +export default function() { for (let index = 0; index < 100; index++) { - let messages = [ - { + let messages = [{ key: schemaRegistry.serialize({ data: "test-key-string", schemaType: SCHEMA_TYPE_STRING, @@ -96,7 +95,7 @@ export default function () { schemaType: SCHEMA_TYPE_STRING, }) == "string" && schemaRegistry.deserialize({ data: msg.value, schemaType: SCHEMA_TYPE_STRING }) == - "test-value-string", + "test-value-string", "Header equals {'mykey': 'myvalue'}": (msg) => "mykey" in msg.headers && String.fromCharCode(...msg.headers["mykey"]) == "myvalue", "Time is past": (msg) => new Date(msg["time"]) < new Date(), @@ -114,4 +113,4 @@ export function teardown(data) { writer.close(); reader.close(); connection.close(); -} +} \ No newline at end of file diff --git a/stats.go b/stats.go index d914eac..2be0a88 100644 --- a/stats.go +++ b/stats.go @@ -58,177 +58,177 @@ func registerMetrics(vu modules.VU) (kafkaMetrics, error) { kafkaMetrics := kafkaMetrics{} if kafkaMetrics.ReaderDials, err = registry.NewMetric( - "kafka.reader.dial.count", metrics.Counter); err != nil { + "kafka_reader_dial_count", metrics.Counter); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderFetches, err = registry.NewMetric( - "kafka.reader.fetches.count", metrics.Counter); err != nil { + "kafka_reader_fetches_count", metrics.Counter); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderMessages, err = registry.NewMetric( - "kafka.reader.message.count", metrics.Counter); err != nil { + "kafka_reader_message_count", metrics.Counter); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderBytes, err = registry.NewMetric( - "kafka.reader.message.bytes", metrics.Counter, metrics.Data); err != nil { + "kafka_reader_message_bytes", metrics.Counter, metrics.Data); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderRebalances, err = registry.NewMetric( - "kafka.reader.rebalance.count", metrics.Counter); err != nil { + "kafka_reader_rebalance_count", metrics.Counter); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderTimeouts, err = registry.NewMetric( - "kafka.reader.timeouts.count", metrics.Counter); err != nil { + "kafka_reader_timeouts_count", metrics.Counter); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderErrors, err = registry.NewMetric( - "kafka.reader.error.count", metrics.Counter); err != nil { + "kafka_reader_error_count", metrics.Counter); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderDialTime, err = registry.NewMetric( - "kafka.reader.dial.seconds", metrics.Trend, metrics.Time); err != nil { + "kafka_reader_dial_seconds", metrics.Trend, metrics.Time); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderReadTime, err = registry.NewMetric( - "kafka.reader.read.seconds", metrics.Trend, metrics.Time); err != nil { + "kafka_reader_read_seconds", metrics.Trend, metrics.Time); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderWaitTime, err = registry.NewMetric( - "kafka.reader.wait.seconds", metrics.Trend, metrics.Time); err != nil { + "kafka_reader_wait_seconds", metrics.Trend, metrics.Time); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderFetchSize, err = registry.NewMetric( - "kafka.reader.fetch.size", metrics.Counter); err != nil { + "kafka_reader_fetch_size", metrics.Counter); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderFetchBytes, err = registry.NewMetric( - "kafka.reader.fetch.bytes", metrics.Counter, metrics.Data); err != nil { + "kafka_reader_fetch_bytes", metrics.Counter, metrics.Data); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderOffset, err = registry.NewMetric( - "kafka.reader.offset", metrics.Gauge); err != nil { + "kafka_reader_offset", metrics.Gauge); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderLag, err = registry.NewMetric( - "kafka.reader.lag", metrics.Gauge); err != nil { + "kafka_reader_lag", metrics.Gauge); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderMinBytes, err = registry.NewMetric( - "kafka.reader.fetch_bytes.min", metrics.Gauge); err != nil { + "kafka_reader_fetch_bytes_min", metrics.Gauge); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderMaxBytes, err = registry.NewMetric( - "kafka.reader.fetch_bytes.max", metrics.Gauge); err != nil { + "kafka_reader_fetch_bytes_max", metrics.Gauge); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderMaxWait, err = registry.NewMetric( - "kafka.reader.fetch_wait.max", metrics.Gauge, metrics.Time); err != nil { + "kafka_reader_fetch_wait_max", metrics.Gauge, metrics.Time); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderQueueLength, err = registry.NewMetric( - "kafka.reader.queue.length", metrics.Gauge); err != nil { + "kafka_reader_queue_length", metrics.Gauge); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.ReaderQueueCapacity, err = registry.NewMetric( - "kafka.reader.queue.capacity", metrics.Gauge); err != nil { + "kafka_reader_queue_capacity", metrics.Gauge); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterWrites, err = registry.NewMetric( - "kafka.writer.write.count", metrics.Counter); err != nil { + "kafka_writer_write_count", metrics.Counter); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterMessages, err = registry.NewMetric( - "kafka.writer.message.count", metrics.Counter); err != nil { + "kafka_writer_message_count", metrics.Counter); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterBytes, err = registry.NewMetric( - "kafka.writer.message.bytes", metrics.Counter, metrics.Data); err != nil { + "kafka_writer_message_bytes", metrics.Counter, metrics.Data); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterErrors, err = registry.NewMetric( - "kafka.writer.error.count", metrics.Counter); err != nil { + "kafka_writer_error_count", metrics.Counter); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterWriteTime, err = registry.NewMetric( - "kafka.writer.write.seconds", metrics.Trend, metrics.Time); err != nil { + "kafka_writer_write_seconds", metrics.Trend, metrics.Time); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterWaitTime, err = registry.NewMetric( - "kafka.writer.wait.seconds", metrics.Trend, metrics.Time); err != nil { + "kafka_writer_wait_seconds", metrics.Trend, metrics.Time); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterRetries, err = registry.NewMetric( - "kafka.writer.retries.count", metrics.Counter); err != nil { + "kafka_writer_retries_count", metrics.Counter); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterBatchSize, err = registry.NewMetric( - "kafka.writer.batch.size", metrics.Counter); err != nil { + "kafka_writer_batch_size", metrics.Counter); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterBatchBytes, err = registry.NewMetric( - "kafka.writer.batch.bytes", metrics.Counter, metrics.Data); err != nil { + "kafka_writer_batch_bytes", metrics.Counter, metrics.Data); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterMaxAttempts, err = registry.NewMetric( - "kafka.writer.attempts.max", metrics.Gauge); err != nil { + "kafka_writer_attempts_max", metrics.Gauge); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterMaxBatchSize, err = registry.NewMetric( - "kafka.writer.batch.max", metrics.Gauge); err != nil { + "kafka_writer_batch_max", metrics.Gauge); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterBatchTimeout, err = registry.NewMetric( - "kafka.writer.batch.timeout", metrics.Gauge, metrics.Time); err != nil { + "kafka_writer_batch_timeout", metrics.Gauge, metrics.Time); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterReadTimeout, err = registry.NewMetric( - "kafka.writer.read.timeout", metrics.Gauge, metrics.Time); err != nil { + "kafka_writer_read_timeout", metrics.Gauge, metrics.Time); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterWriteTimeout, err = registry.NewMetric( - "kafka.writer.write.timeout", metrics.Gauge, metrics.Time); err != nil { + "kafka_writer_write_timeout", metrics.Gauge, metrics.Time); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterRequiredAcks, err = registry.NewMetric( - "kafka.writer.acks.required", metrics.Gauge); err != nil { + "kafka_writer_acks_required", metrics.Gauge); err != nil { return kafkaMetrics, errors.Unwrap(err) } if kafkaMetrics.WriterAsync, err = registry.NewMetric( - "kafka.writer.async", metrics.Rate); err != nil { + "kafka_writer_async", metrics.Rate); err != nil { return kafkaMetrics, errors.Unwrap(err) }