Skip to content

Commit

Permalink
Correctly handle KafkaMetric metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Mickael Maison <[email protected]>
  • Loading branch information
mimaison committed Sep 11, 2024
1 parent 6de2885 commit 2dee579
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 13 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.prometheus.metrics.model.snapshots.MetricSnapshot;
import io.prometheus.metrics.model.snapshots.MetricSnapshots;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -68,17 +69,17 @@ public MetricSnapshots collect() {
for (Map.Entry<MetricName, MetricWrapper> entry : metrics.entrySet()) {
MetricWrapper metricWrapper = entry.getValue();
String prometheusMetricName = metricWrapper.prometheusName();
Object metric = metricWrapper.value();
Object metricValue = ((KafkaMetric) metricWrapper.metric()).metricValue();
Labels labels = metricWrapper.labels();
LOG.debug("Collecting metric {} with the following labels: {}", prometheusMetricName, labels);

if (metric instanceof Number) {
double value = ((Number) metric).doubleValue();
if (metricValue instanceof Number) {
double value = ((Number) metricValue).doubleValue();
GaugeSnapshot.Builder builder = gaugeBuilders.computeIfAbsent(prometheusMetricName, k -> GaugeSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.gaugeDataPoint(labels, value));
} else {
InfoSnapshot.Builder builder = infoBuilders.computeIfAbsent(prometheusMetricName, k -> InfoSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, metric, metricWrapper.attribute()));
builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, metricValue, metricWrapper.attribute()));
}
}
List<MetricSnapshot> snapshots = new ArrayList<>();
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/strimzi/kafka/metrics/MetricWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class MetricWrapper {

private final String prometheusName;
private final Labels labels;
private final Object value;
private final Object metric;
private final String attribute;

/**
Expand All @@ -38,7 +38,7 @@ public class MetricWrapper {
public MetricWrapper(String prometheusName, KafkaMetric metric, String attribute) {
this.prometheusName = prometheusName;
this.labels = labelsFromTags(metric.metricName().tags(), prometheusName);
this.value = metric.metricValue();
this.metric = metric;
this.attribute = attribute;
}

Expand All @@ -52,7 +52,7 @@ public MetricWrapper(String prometheusName, KafkaMetric metric, String attribute
public MetricWrapper(String prometheusName, String scope, Metric metric, String attribute) {
this.prometheusName = prometheusName;
this.labels = labelsFromScope(scope, prometheusName);
this.value = metric;
this.metric = metric;
this.attribute = attribute;
}

Expand All @@ -73,11 +73,11 @@ public Labels labels() {
}

/**
* The metric value
* @return The value
* The underlying metric
* @return The metric
*/
public Object value() {
return value;
public Object metric() {
return metric;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public MetricSnapshots collect() {
for (Map.Entry<MetricName, MetricWrapper> entry : metrics.entrySet()) {
MetricWrapper metricWrapper = entry.getValue();
String prometheusMetricName = metricWrapper.prometheusName();
Object metric = metricWrapper.value();
Object metric = metricWrapper.metric();
Labels labels = metricWrapper.labels();
LOG.debug("Collecting metric {} with the following labels: {}", prometheusMetricName, labels);

Expand Down
32 changes: 32 additions & 0 deletions src/test/java/io/strimzi/kafka/metrics/MetricWrapperTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@
*/
package io.strimzi.kafka.metrics;

import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import io.prometheus.metrics.model.snapshots.Labels;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -59,4 +67,28 @@ public void testKafkaMetricName() {
String metricName = MetricWrapper.prometheusName("kafka_server", new org.apache.kafka.common.MetricName("NaMe", "KafKa.neTwork", "", Collections.emptyMap()));
assertEquals("kafka_server_kafka_network_name", metricName);
}

@Test
public void testKafkaMetric() {
AtomicInteger value = new AtomicInteger(0);
org.apache.kafka.common.MetricName name = new org.apache.kafka.common.MetricName("name", "kafka.server", "", Collections.emptyMap());
KafkaMetric metric = new KafkaMetric(new Object(), name, (Gauge<Integer>) (metricConfig, l) -> value.get(), new MetricConfig(), Time.SYSTEM);
String prometheusName = MetricWrapper.prometheusName("kafka_server", name);
MetricWrapper wrapper = new MetricWrapper(prometheusName, metric, "name");
assertEquals(value.get(), ((KafkaMetric) wrapper.metric()).metricValue());
value.incrementAndGet();
assertEquals(value.get(), ((KafkaMetric) wrapper.metric()).metricValue());
}

@Test
public void testYammerMetric() {
MetricName name = new MetricName("group", "type", "name");
MetricsRegistry registry = Metrics.defaultRegistry();
Counter counter = registry.newCounter(name);
String prometheusName = MetricWrapper.prometheusName(name);
MetricWrapper wrapper = new MetricWrapper(prometheusName, "", counter, "name");
assertEquals(counter.count(), ((Counter) wrapper.metric()).count());
counter.inc();
assertEquals(counter.count(), ((Counter) wrapper.metric()).count());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ public void testStartHttpServer() {
PrometheusMetricsReporterConfig config3 = new PrometheusMetricsReporterConfig(props, new PrometheusRegistry());
Exception exc = assertThrows(RuntimeException.class, config3::startHttpServer);
assertInstanceOf(BindException.class, exc.getCause());

HttpServers.release(httpServerOptional.get());
HttpServers.release(httpServerOptional2.get());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testCollect() {
assertEquals(labels, datapoint.getLabels());

// Update the value of the metric
((Counter) metricWrapper.value()).inc(10);
((Counter) metricWrapper.metric()).inc(10);
metrics = collector.collect();

assertEquals(1, metrics.size());
Expand Down

0 comments on commit 2dee579

Please sign in to comment.