Skip to content

Commit

Permalink
Change dot to underscore in metrics separator to be compatible with p…
Browse files Browse the repository at this point in the history
…rometheus metrics (#177)

Change dot to underscore in metrics separator to be compatible with prometheus metrics.
  • Loading branch information
rgordill authored Nov 21, 2022
1 parent 0811422 commit 120d70d
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 61 deletions.
33 changes: 14 additions & 19 deletions scripts/test_json.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,24 @@ const schemaRegistry = new SchemaRegistry();
if (__VU == 0) {
connection.createTopic({
topic: topic,
configEntries: [
{
configName: "compression.type",
configValue: CODEC_SNAPPY,
},
],
configEntries: [{
configName: "compression.type",
configValue: CODEC_SNAPPY,
}, ],
});
}

export const options = {
thresholds: {
// Base thresholds to see if the writer or reader is working
"kafka.writer.error.count": ["count == 0"],
"kafka.reader.error.count": ["count == 0"],
"kafka_writer_error_count": ["count == 0"],
"kafka_reader_error_count": ["count == 0"],
},
};

export default function () {
export default function() {
for (let index = 0; index < 100; index++) {
let messages = [
{
let messages = [{
// The data type of the key is JSON
key: schemaRegistry.serialize({
data: {
Expand All @@ -76,8 +73,7 @@ export default function () {
name: "xk6-kafka",
version: "0.9.0",
author: "Mostafa Moradian",
description:
"k6 extension to load test Apache Kafka with support for Avro messages",
description: "k6 extension to load test Apache Kafka with support for Avro messages",
index: index,
},
schemaType: SCHEMA_TYPE_JSON,
Expand All @@ -101,8 +97,7 @@ export default function () {
name: "xk6-kafka",
version: "0.9.0",
author: "Mostafa Moradian",
description:
"k6 extension to load test Apache Kafka with support for Avro messages",
description: "k6 extension to load test Apache Kafka with support for Avro messages",
index: index,
},
schemaType: SCHEMA_TYPE_JSON,
Expand All @@ -127,15 +122,15 @@ export default function () {
"Topic equals to xk6_kafka_json_topic": (msg) => msg["topic"] == topic,
"Key contains key/value and is JSON": (msg) =>
schemaRegistry
.deserialize({ data: msg.key, schemaType: SCHEMA_TYPE_JSON })
.correlationId.startsWith("test-id-"),
.deserialize({ data: msg.key, schemaType: SCHEMA_TYPE_JSON })
.correlationId.startsWith("test-id-"),
"Value contains key/value and is JSON": (msg) =>
typeof schemaRegistry.deserialize({
data: msg.value,
schemaType: SCHEMA_TYPE_JSON,
}) == "object" &&
schemaRegistry.deserialize({ data: msg.value, schemaType: SCHEMA_TYPE_JSON }).name ==
"xk6-kafka",
"xk6-kafka",
"Header equals {'mykey': 'myvalue'}": (msg) =>
"mykey" in msg.headers && String.fromCharCode(...msg.headers["mykey"]) == "myvalue",
"Time is past": (msg) => new Date(msg["time"]) < new Date(),
Expand All @@ -153,4 +148,4 @@ export function teardown(data) {
writer.close();
reader.close();
connection.close();
}
}
13 changes: 6 additions & 7 deletions scripts/test_string.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ if (__VU == 0) {
export const options = {
thresholds: {
// Base thresholds to see if the writer or reader is working
"kafka.writer.error.count": ["count == 0"],
"kafka.reader.error.count": ["count == 0"],
"kafka_writer_error_count": ["count == 0"],
"kafka_reader_error_count": ["count == 0"],
},
};

export default function () {
export default function() {
for (let index = 0; index < 100; index++) {
let messages = [
{
let messages = [{
key: schemaRegistry.serialize({
data: "test-key-string",
schemaType: SCHEMA_TYPE_STRING,
Expand Down Expand Up @@ -96,7 +95,7 @@ export default function () {
schemaType: SCHEMA_TYPE_STRING,
}) == "string" &&
schemaRegistry.deserialize({ data: msg.value, schemaType: SCHEMA_TYPE_STRING }) ==
"test-value-string",
"test-value-string",
"Header equals {'mykey': 'myvalue'}": (msg) =>
"mykey" in msg.headers && String.fromCharCode(...msg.headers["mykey"]) == "myvalue",
"Time is past": (msg) => new Date(msg["time"]) < new Date(),
Expand All @@ -114,4 +113,4 @@ export function teardown(data) {
writer.close();
reader.close();
connection.close();
}
}
70 changes: 35 additions & 35 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,177 +58,177 @@ func registerMetrics(vu modules.VU) (kafkaMetrics, error) {
kafkaMetrics := kafkaMetrics{}

if kafkaMetrics.ReaderDials, err = registry.NewMetric(
"kafka.reader.dial.count", metrics.Counter); err != nil {
"kafka_reader_dial_count", metrics.Counter); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderFetches, err = registry.NewMetric(
"kafka.reader.fetches.count", metrics.Counter); err != nil {
"kafka_reader_fetches_count", metrics.Counter); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderMessages, err = registry.NewMetric(
"kafka.reader.message.count", metrics.Counter); err != nil {
"kafka_reader_message_count", metrics.Counter); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderBytes, err = registry.NewMetric(
"kafka.reader.message.bytes", metrics.Counter, metrics.Data); err != nil {
"kafka_reader_message_bytes", metrics.Counter, metrics.Data); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderRebalances, err = registry.NewMetric(
"kafka.reader.rebalance.count", metrics.Counter); err != nil {
"kafka_reader_rebalance_count", metrics.Counter); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderTimeouts, err = registry.NewMetric(
"kafka.reader.timeouts.count", metrics.Counter); err != nil {
"kafka_reader_timeouts_count", metrics.Counter); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderErrors, err = registry.NewMetric(
"kafka.reader.error.count", metrics.Counter); err != nil {
"kafka_reader_error_count", metrics.Counter); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderDialTime, err = registry.NewMetric(
"kafka.reader.dial.seconds", metrics.Trend, metrics.Time); err != nil {
"kafka_reader_dial_seconds", metrics.Trend, metrics.Time); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderReadTime, err = registry.NewMetric(
"kafka.reader.read.seconds", metrics.Trend, metrics.Time); err != nil {
"kafka_reader_read_seconds", metrics.Trend, metrics.Time); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderWaitTime, err = registry.NewMetric(
"kafka.reader.wait.seconds", metrics.Trend, metrics.Time); err != nil {
"kafka_reader_wait_seconds", metrics.Trend, metrics.Time); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderFetchSize, err = registry.NewMetric(
"kafka.reader.fetch.size", metrics.Counter); err != nil {
"kafka_reader_fetch_size", metrics.Counter); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderFetchBytes, err = registry.NewMetric(
"kafka.reader.fetch.bytes", metrics.Counter, metrics.Data); err != nil {
"kafka_reader_fetch_bytes", metrics.Counter, metrics.Data); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderOffset, err = registry.NewMetric(
"kafka.reader.offset", metrics.Gauge); err != nil {
"kafka_reader_offset", metrics.Gauge); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderLag, err = registry.NewMetric(
"kafka.reader.lag", metrics.Gauge); err != nil {
"kafka_reader_lag", metrics.Gauge); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderMinBytes, err = registry.NewMetric(
"kafka.reader.fetch_bytes.min", metrics.Gauge); err != nil {
"kafka_reader_fetch_bytes_min", metrics.Gauge); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderMaxBytes, err = registry.NewMetric(
"kafka.reader.fetch_bytes.max", metrics.Gauge); err != nil {
"kafka_reader_fetch_bytes_max", metrics.Gauge); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderMaxWait, err = registry.NewMetric(
"kafka.reader.fetch_wait.max", metrics.Gauge, metrics.Time); err != nil {
"kafka_reader_fetch_wait_max", metrics.Gauge, metrics.Time); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderQueueLength, err = registry.NewMetric(
"kafka.reader.queue.length", metrics.Gauge); err != nil {
"kafka_reader_queue_length", metrics.Gauge); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.ReaderQueueCapacity, err = registry.NewMetric(
"kafka.reader.queue.capacity", metrics.Gauge); err != nil {
"kafka_reader_queue_capacity", metrics.Gauge); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterWrites, err = registry.NewMetric(
"kafka.writer.write.count", metrics.Counter); err != nil {
"kafka_writer_write_count", metrics.Counter); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterMessages, err = registry.NewMetric(
"kafka.writer.message.count", metrics.Counter); err != nil {
"kafka_writer_message_count", metrics.Counter); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterBytes, err = registry.NewMetric(
"kafka.writer.message.bytes", metrics.Counter, metrics.Data); err != nil {
"kafka_writer_message_bytes", metrics.Counter, metrics.Data); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterErrors, err = registry.NewMetric(
"kafka.writer.error.count", metrics.Counter); err != nil {
"kafka_writer_error_count", metrics.Counter); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterWriteTime, err = registry.NewMetric(
"kafka.writer.write.seconds", metrics.Trend, metrics.Time); err != nil {
"kafka_writer_write_seconds", metrics.Trend, metrics.Time); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterWaitTime, err = registry.NewMetric(
"kafka.writer.wait.seconds", metrics.Trend, metrics.Time); err != nil {
"kafka_writer_wait_seconds", metrics.Trend, metrics.Time); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterRetries, err = registry.NewMetric(
"kafka.writer.retries.count", metrics.Counter); err != nil {
"kafka_writer_retries_count", metrics.Counter); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterBatchSize, err = registry.NewMetric(
"kafka.writer.batch.size", metrics.Counter); err != nil {
"kafka_writer_batch_size", metrics.Counter); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterBatchBytes, err = registry.NewMetric(
"kafka.writer.batch.bytes", metrics.Counter, metrics.Data); err != nil {
"kafka_writer_batch_bytes", metrics.Counter, metrics.Data); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterMaxAttempts, err = registry.NewMetric(
"kafka.writer.attempts.max", metrics.Gauge); err != nil {
"kafka_writer_attempts_max", metrics.Gauge); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterMaxBatchSize, err = registry.NewMetric(
"kafka.writer.batch.max", metrics.Gauge); err != nil {
"kafka_writer_batch_max", metrics.Gauge); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterBatchTimeout, err = registry.NewMetric(
"kafka.writer.batch.timeout", metrics.Gauge, metrics.Time); err != nil {
"kafka_writer_batch_timeout", metrics.Gauge, metrics.Time); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterReadTimeout, err = registry.NewMetric(
"kafka.writer.read.timeout", metrics.Gauge, metrics.Time); err != nil {
"kafka_writer_read_timeout", metrics.Gauge, metrics.Time); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterWriteTimeout, err = registry.NewMetric(
"kafka.writer.write.timeout", metrics.Gauge, metrics.Time); err != nil {
"kafka_writer_write_timeout", metrics.Gauge, metrics.Time); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterRequiredAcks, err = registry.NewMetric(
"kafka.writer.acks.required", metrics.Gauge); err != nil {
"kafka_writer_acks_required", metrics.Gauge); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

if kafkaMetrics.WriterAsync, err = registry.NewMetric(
"kafka.writer.async", metrics.Rate); err != nil {
"kafka_writer_async", metrics.Rate); err != nil {
return kafkaMetrics, errors.Unwrap(err)
}

Expand Down

0 comments on commit 120d70d

Please sign in to comment.