Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly handle KafkaMetric metrics #45

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -281,6 +287,10 @@
</goals>
<configuration>
<failOnWarning>true</failOnWarning>
<ignoredUnusedDeclaredDependencies>
<!-- Needed for logging in tests -->
<ignoredUnusedDeclaredDependency>org.slf4j:slf4j-simple</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.prometheus.metrics.model.snapshots.MetricSnapshot;
import io.prometheus.metrics.model.snapshots.MetricSnapshots;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -68,17 +69,17 @@ public MetricSnapshots collect() {
for (Map.Entry<MetricName, MetricWrapper> entry : metrics.entrySet()) {
MetricWrapper metricWrapper = entry.getValue();
String prometheusMetricName = metricWrapper.prometheusName();
Object metric = metricWrapper.value();
Object metricValue = ((KafkaMetric) metricWrapper.metric()).metricValue();
Labels labels = metricWrapper.labels();
LOG.debug("Collecting metric {} with the following labels: {}", prometheusMetricName, labels);

if (metric instanceof Number) {
double value = ((Number) metric).doubleValue();
if (metricValue instanceof Number) {
double value = ((Number) metricValue).doubleValue();
GaugeSnapshot.Builder builder = gaugeBuilders.computeIfAbsent(prometheusMetricName, k -> GaugeSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.gaugeDataPoint(labels, value));
} else {
InfoSnapshot.Builder builder = infoBuilders.computeIfAbsent(prometheusMetricName, k -> InfoSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, metric, metricWrapper.attribute()));
builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, metricValue, metricWrapper.attribute()));
}
}
List<MetricSnapshot> snapshots = new ArrayList<>();
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/strimzi/kafka/metrics/MetricWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class MetricWrapper {

private final String prometheusName;
private final Labels labels;
private final Object value;
private final Object metric;
private final String attribute;

/**
Expand All @@ -38,7 +38,7 @@ public class MetricWrapper {
public MetricWrapper(String prometheusName, KafkaMetric metric, String attribute) {
this.prometheusName = prometheusName;
this.labels = labelsFromTags(metric.metricName().tags(), prometheusName);
this.value = metric.metricValue();
this.metric = metric;
this.attribute = attribute;
}

Expand All @@ -52,7 +52,7 @@ public MetricWrapper(String prometheusName, KafkaMetric metric, String attribute
public MetricWrapper(String prometheusName, String scope, Metric metric, String attribute) {
this.prometheusName = prometheusName;
this.labels = labelsFromScope(scope, prometheusName);
this.value = metric;
this.metric = metric;
this.attribute = attribute;
}

Expand All @@ -73,11 +73,11 @@ public Labels labels() {
}

/**
* The metric value
* @return The value
* The underlying metric
* @return The metric
*/
public Object value() {
return value;
public Object metric() {
return metric;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public MetricSnapshots collect() {
for (Map.Entry<MetricName, MetricWrapper> entry : metrics.entrySet()) {
MetricWrapper metricWrapper = entry.getValue();
String prometheusMetricName = metricWrapper.prometheusName();
Object metric = metricWrapper.value();
Object metric = metricWrapper.metric();
Labels labels = metricWrapper.labels();
LOG.debug("Collecting metric {} with the following labels: {}", prometheusMetricName, labels);

Expand Down
32 changes: 32 additions & 0 deletions src/test/java/io/strimzi/kafka/metrics/MetricWrapperTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@
*/
package io.strimzi.kafka.metrics;

import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import io.prometheus.metrics.model.snapshots.Labels;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -59,4 +67,28 @@ public void testKafkaMetricName() {
String metricName = MetricWrapper.prometheusName("kafka_server", new org.apache.kafka.common.MetricName("NaMe", "KafKa.neTwork", "", Collections.emptyMap()));
assertEquals("kafka_server_kafka_network_name", metricName);
}

@Test
public void testKafkaMetric() {
AtomicInteger value = new AtomicInteger(0);
org.apache.kafka.common.MetricName name = new org.apache.kafka.common.MetricName("name", "kafka.server", "", Collections.emptyMap());
KafkaMetric metric = new KafkaMetric(new Object(), name, (Gauge<Integer>) (metricConfig, l) -> value.get(), new MetricConfig(), Time.SYSTEM);
String prometheusName = MetricWrapper.prometheusName("kafka_server", name);
MetricWrapper wrapper = new MetricWrapper(prometheusName, metric, "name");
assertEquals(value.get(), ((KafkaMetric) wrapper.metric()).metricValue());
value.incrementAndGet();
assertEquals(value.get(), ((KafkaMetric) wrapper.metric()).metricValue());
}

@Test
public void testYammerMetric() {
MetricName name = new MetricName("group", "type", "name");
MetricsRegistry registry = Metrics.defaultRegistry();
Counter counter = registry.newCounter(name);
String prometheusName = MetricWrapper.prometheusName(name);
MetricWrapper wrapper = new MetricWrapper(prometheusName, "", counter, "name");
assertEquals(counter.count(), ((Counter) wrapper.metric()).count());
counter.inc();
assertEquals(counter.count(), ((Counter) wrapper.metric()).count());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ public void testStartHttpServer() {
PrometheusMetricsReporterConfig config3 = new PrometheusMetricsReporterConfig(props, new PrometheusRegistry());
Exception exc = assertThrows(RuntimeException.class, config3::startHttpServer);
assertInstanceOf(BindException.class, exc.getCause());

HttpServers.release(httpServerOptional.get());
HttpServers.release(httpServerOptional2.get());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testCollect() {
assertEquals(labels, datapoint.getLabels());

// Update the value of the metric
((Counter) metricWrapper.value()).inc(10);
((Counter) metricWrapper.metric()).inc(10);
metrics = collector.collect();

assertEquals(1, metrics.size());
Expand Down