From 586860c89e8d92c311902d3ecc2554702f974391 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Thu, 8 Aug 2024 15:24:22 +0200 Subject: [PATCH] Refactor Yammer metrics collection logic (#37) Signed-off-by: Mickael Maison --- .../kafka/metrics/KafkaMetricsCollector.java | 9 +- .../strimzi/kafka/metrics/MetricWrapper.java | 152 ++++++++++++++++++ .../kafka/metrics/YammerMetricsCollector.java | 130 ++++++--------- .../YammerPrometheusMetricsReporter.java | 48 +++++- .../metrics/KafkaMetricsCollectorTest.java | 11 -- .../kafka/metrics/MetricWrapperTest.java | 51 ++++++ .../metrics/YammerMetricsCollectorTest.java | 103 +++++------- .../YammerPrometheusMetricsReporterTest.java | 77 ++++++++- 8 files changed, 413 insertions(+), 168 deletions(-) create mode 100644 src/main/java/io/strimzi/kafka/metrics/MetricWrapper.java create mode 100644 src/test/java/io/strimzi/kafka/metrics/MetricWrapperTest.java diff --git a/src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java b/src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java index 0071d2e..ec20875 100644 --- a/src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java +++ b/src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -88,7 +87,7 @@ public MetricSnapshots collect() { MetricName metricName = entry.getKey(); KafkaMetric kafkaMetric = entry.getValue(); - String prometheusMetricName = metricName(metricName); + String prometheusMetricName = MetricWrapper.prometheusName(prefix, metricName); if (!config.isAllowed(prometheusMetricName)) { LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusMetricName); continue; @@ -116,12 +115,6 @@ public MetricSnapshots collect() { return new MetricSnapshots(snapshots); } - String metricName(MetricName metricName) { - return PrometheusNaming.prometheusName(PrometheusNaming - .sanitizeMetricName(prefix + '_' + metricName.group() + '_' + metricName.name()) - .toLowerCase(Locale.ROOT)); - } - static Labels labelsFromTags(Map tags, String metricName) { Labels.Builder builder = Labels.builder(); Set labelNames = new HashSet<>(); diff --git a/src/main/java/io/strimzi/kafka/metrics/MetricWrapper.java b/src/main/java/io/strimzi/kafka/metrics/MetricWrapper.java new file mode 100644 index 0000000..8c04767 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/metrics/MetricWrapper.java @@ -0,0 +1,152 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.metrics; + +import com.yammer.metrics.core.Metric; +import com.yammer.metrics.core.MetricName; +import io.prometheus.metrics.model.snapshots.Labels; +import io.prometheus.metrics.model.snapshots.PrometheusNaming; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +/** + * Wrapper for both Kafka and Yammer metrics to unify logic in the Collectors + */ +public class MetricWrapper { + + private static final Logger LOG = LoggerFactory.getLogger(MetricWrapper.class); + + private final String prometheusName; + private final Labels labels; + private final Object value; + private final String attribute; + + // Will be used when implementing https://github.com/strimzi/metrics-reporter/issues/9 + /** + * Constructor from Kafka Metrics + * @param prometheusName The name of the metric in the prometheus format + * @param metric The Kafka metric + * @param attribute The attribute of the Kafka metric + */ + public MetricWrapper(String prometheusName, KafkaMetric metric, String attribute) { + this.prometheusName = prometheusName; + this.labels = labelsFromTags(metric.metricName().tags(), prometheusName); + this.value = metric.metricValue(); + this.attribute = attribute; + } + + /** + * Constructor from Yammer Metrics + * @param prometheusName The name of the metric in the prometheus format + * @param scope The scope of the Yammer metric + * @param metric The Yammer metric + * @param attribute The attribute of the Yammer metric + */ + public MetricWrapper(String prometheusName, String scope, Metric metric, String attribute) { + this.prometheusName = prometheusName; + this.labels = labelsFromScope(scope, prometheusName); + this.value = metric; + this.attribute = attribute; + } + + /** + * The Prometheus name of this metric + * @return The Prometheus name + */ + public String prometheusName() { + return prometheusName; + } + + /** + * The labels associated with this metric + * @return The labels + */ + public Labels labels() { + return labels; + } + + /** + * The metric value + * @return The value + */ + public Object value() { + return value; + } + + /** + * The metric attribute + * @return The attribute + */ + public String attribute() { + return attribute; + } + + private static Labels labelsFromScope(String scope, String metricName) { + Labels.Builder builder = Labels.builder(); + Set labelNames = new HashSet<>(); + if (scope != null) { + String[] parts = scope.split("\\."); + if (parts.length % 2 == 0) { + for (int i = 0; i < parts.length; i += 2) { + String newLabelName = PrometheusNaming.sanitizeLabelName(parts[i]); + if (labelNames.add(newLabelName)) { + builder.label(newLabelName, parts[i + 1]); + } else { + LOG.warn("Ignoring duplicate label key: {} with value: {} from metric: {} ", newLabelName, parts[i + 1], metricName); + } + } + } + } + return builder.build(); + } + + // Will be used when implementing https://github.com/strimzi/metrics-reporter/issues/9 + private static Labels labelsFromTags(Map tags, String metricName) { + Labels.Builder builder = Labels.builder(); + Set labelNames = new HashSet<>(); + for (Map.Entry label : tags.entrySet()) { + String newLabelName = PrometheusNaming.sanitizeLabelName(label.getKey()); + if (labelNames.add(newLabelName)) { + builder.label(newLabelName, label.getValue()); + } else { + LOG.warn("Ignoring duplicate label key: {} with value: {} from metric: {} ", newLabelName, label.getValue(), metricName); + } + } + return builder.build(); + } + + /** + * Compute the Prometheus name from a Yammer MetricName + * @param metricName The Yammer metric name + * @return The prometheus metric name + */ + public static String prometheusName(MetricName metricName) { + return PrometheusNaming.prometheusName( + PrometheusNaming.sanitizeMetricName( + "kafka_server_" + + metricName.getGroup() + '_' + + metricName.getType() + '_' + + metricName.getName()).toLowerCase(Locale.ROOT)); + } + + // Will be used when implementing https://github.com/strimzi/metrics-reporter/issues/9 + /** + * Compute the Prometheus name from a Kafka MetricName + * @param prefix The prefix to add to the metric name + * @param metricName The Kafka metric name + * @return The prometheus metric name + */ + public static String prometheusName(String prefix, org.apache.kafka.common.MetricName metricName) { + return PrometheusNaming.prometheusName( + PrometheusNaming.sanitizeMetricName( + prefix + '_' + metricName.group() + '_' + metricName.name()).toLowerCase(Locale.ROOT)); + } +} diff --git a/src/main/java/io/strimzi/kafka/metrics/YammerMetricsCollector.java b/src/main/java/io/strimzi/kafka/metrics/YammerMetricsCollector.java index 53ef3e7..3463796 100644 --- a/src/main/java/io/strimzi/kafka/metrics/YammerMetricsCollector.java +++ b/src/main/java/io/strimzi/kafka/metrics/YammerMetricsCollector.java @@ -4,14 +4,11 @@ */ package io.strimzi.kafka.metrics; -import com.yammer.metrics.Metrics; import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.Gauge; import com.yammer.metrics.core.Histogram; import com.yammer.metrics.core.Meter; -import com.yammer.metrics.core.Metric; import com.yammer.metrics.core.MetricName; -import com.yammer.metrics.core.MetricsRegistry; import com.yammer.metrics.core.Sampling; import com.yammer.metrics.core.Timer; import io.prometheus.metrics.model.registry.MultiCollector; @@ -21,22 +18,18 @@ import io.prometheus.metrics.model.snapshots.Labels; import io.prometheus.metrics.model.snapshots.MetricSnapshot; import io.prometheus.metrics.model.snapshots.MetricSnapshots; -import io.prometheus.metrics.model.snapshots.PrometheusNaming; import io.prometheus.metrics.model.snapshots.Quantile; import io.prometheus.metrics.model.snapshots.Quantiles; import io.prometheus.metrics.model.snapshots.SummarySnapshot; -import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; /** * Prometheus Collector to store and export metrics retrieved by {@link YammerPrometheusMetricsReporter}. @@ -47,17 +40,13 @@ public class YammerMetricsCollector implements MultiCollector { private static final Logger LOG = LoggerFactory.getLogger(YammerMetricsCollector.class); private static final List QUANTILES = Arrays.asList(0.50, 0.75, 0.95, 0.98, 0.99, 0.999); - private final List registries; - private final PrometheusMetricsReporterConfig config; + private final Map metrics; /** - * Constructs a new YammerMetricsCollector with the provided configuration. - * - * @param config The configuration for the YammerMetricsCollector. + * Constructor */ - public YammerMetricsCollector(PrometheusMetricsReporterConfig config) { - this.config = config; - this.registries = Arrays.asList(KafkaYammerMetrics.defaultRegistry(), Metrics.defaultRegistry()); + public YammerMetricsCollector() { + this.metrics = new ConcurrentHashMap<>(); } /** @@ -72,48 +61,41 @@ public MetricSnapshots collect() { Map infoBuilders = new HashMap<>(); Map summaryBuilders = new HashMap<>(); - for (MetricsRegistry registry : registries) { - for (Map.Entry entry : registry.allMetrics().entrySet()) { - MetricName metricName = entry.getKey(); - Metric metric = entry.getValue(); - - String prometheusMetricName = metricName(metricName); - if (!config.isAllowed(prometheusMetricName)) { - LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusMetricName); - continue; - } - Labels labels = labelsFromScope(metricName.getScope(), prometheusMetricName); - LOG.debug("Collecting metric {} with the following labels: {}", prometheusMetricName, labels); + for (Map.Entry entry : metrics.entrySet()) { + MetricWrapper metricWrapper = entry.getValue(); + String prometheusMetricName = metricWrapper.prometheusName(); + Object metric = metricWrapper.value(); + Labels labels = metricWrapper.labels(); + LOG.debug("Collecting metric {} with the following labels: {}", prometheusMetricName, labels); - if (metric instanceof Counter) { - Counter counter = (Counter) metric; - CounterSnapshot.Builder builder = counterBuilders.computeIfAbsent(prometheusMetricName, k -> CounterSnapshot.builder().name(prometheusMetricName)); - builder.dataPoint(DataPointSnapshotBuilder.counterDataPoint(labels, counter.count())); - } else if (metric instanceof Gauge) { - Object valueObj = ((Gauge) metric).value(); - if (valueObj instanceof Number) { - double value = ((Number) valueObj).doubleValue(); - GaugeSnapshot.Builder builder = gaugeBuilders.computeIfAbsent(prometheusMetricName, k -> GaugeSnapshot.builder().name(prometheusMetricName)); - builder.dataPoint(DataPointSnapshotBuilder.gaugeDataPoint(labels, value)); - } else { - InfoSnapshot.Builder builder = infoBuilders.computeIfAbsent(prometheusMetricName, k -> InfoSnapshot.builder().name(prometheusMetricName)); - builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, valueObj, metricName.getName())); - } - } else if (metric instanceof Timer) { - Timer timer = (Timer) metric; - SummarySnapshot.Builder builder = summaryBuilders.computeIfAbsent(prometheusMetricName, k -> SummarySnapshot.builder().name(prometheusMetricName)); - builder.dataPoint(DataPointSnapshotBuilder.summaryDataPoint(labels, timer.count(), timer.sum(), quantiles(timer))); - } else if (metric instanceof Histogram) { - Histogram histogram = (Histogram) metric; - SummarySnapshot.Builder builder = summaryBuilders.computeIfAbsent(prometheusMetricName, k -> SummarySnapshot.builder().name(prometheusMetricName)); - builder.dataPoint(DataPointSnapshotBuilder.summaryDataPoint(labels, histogram.count(), histogram.sum(), quantiles(histogram))); - } else if (metric instanceof Meter) { - Meter meter = (Meter) metric; - CounterSnapshot.Builder builder = counterBuilders.computeIfAbsent(prometheusMetricName, k -> CounterSnapshot.builder().name(prometheusMetricName)); - builder.dataPoint(DataPointSnapshotBuilder.counterDataPoint(labels, meter.count())); + if (metric instanceof Counter) { + Counter counter = (Counter) metric; + CounterSnapshot.Builder builder = counterBuilders.computeIfAbsent(prometheusMetricName, k -> CounterSnapshot.builder().name(prometheusMetricName)); + builder.dataPoint(DataPointSnapshotBuilder.counterDataPoint(labels, counter.count())); + } else if (metric instanceof Gauge) { + Object valueObj = ((Gauge) metric).value(); + if (valueObj instanceof Number) { + double value = ((Number) valueObj).doubleValue(); + GaugeSnapshot.Builder builder = gaugeBuilders.computeIfAbsent(prometheusMetricName, k -> GaugeSnapshot.builder().name(prometheusMetricName)); + builder.dataPoint(DataPointSnapshotBuilder.gaugeDataPoint(labels, value)); } else { - LOG.error("The metric {} has an unexpected type: {}", prometheusMetricName, metric.getClass().getName()); + InfoSnapshot.Builder builder = infoBuilders.computeIfAbsent(prometheusMetricName, k -> InfoSnapshot.builder().name(prometheusMetricName)); + builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, valueObj, metricWrapper.attribute())); } + } else if (metric instanceof Timer) { + Timer timer = (Timer) metric; + SummarySnapshot.Builder builder = summaryBuilders.computeIfAbsent(prometheusMetricName, k -> SummarySnapshot.builder().name(prometheusMetricName)); + builder.dataPoint(DataPointSnapshotBuilder.summaryDataPoint(labels, timer.count(), timer.sum(), quantiles(timer))); + } else if (metric instanceof Histogram) { + Histogram histogram = (Histogram) metric; + SummarySnapshot.Builder builder = summaryBuilders.computeIfAbsent(prometheusMetricName, k -> SummarySnapshot.builder().name(prometheusMetricName)); + builder.dataPoint(DataPointSnapshotBuilder.summaryDataPoint(labels, histogram.count(), histogram.sum(), quantiles(histogram))); + } else if (metric instanceof Meter) { + Meter meter = (Meter) metric; + CounterSnapshot.Builder builder = counterBuilders.computeIfAbsent(prometheusMetricName, k -> CounterSnapshot.builder().name(prometheusMetricName)); + builder.dataPoint(DataPointSnapshotBuilder.counterDataPoint(labels, meter.count())); + } else { + LOG.error("The metric {} has an unexpected type: {}", prometheusMetricName, metric.getClass().getName()); } } List snapshots = new ArrayList<>(); @@ -132,31 +114,23 @@ public MetricSnapshots collect() { return new MetricSnapshots(snapshots); } - static String metricName(MetricName metricName) { - return PrometheusNaming.prometheusName(PrometheusNaming.sanitizeMetricName( - "kafka_server_" + - metricName.getGroup() + '_' + - metricName.getType() + '_' + - metricName.getName()).toLowerCase(Locale.ROOT)); + /** + * Add a Yammer metric to be collected. + * + * @param name The name of the Yammer metric to add. + * @param metric The Yammer metric to add. + */ + public void addMetric(MetricName name, MetricWrapper metric) { + metrics.put(name, metric); } - static Labels labelsFromScope(String scope, String metricName) { - Labels.Builder builder = Labels.builder(); - Set labelNames = new HashSet<>(); - if (scope != null) { - String[] parts = scope.split("\\."); - if (parts.length % 2 == 0) { - for (int i = 0; i < parts.length; i += 2) { - String newLabelName = PrometheusNaming.sanitizeLabelName(parts[i]); - if (labelNames.add(newLabelName)) { - builder.label(newLabelName, parts[i + 1]); - } else { - LOG.warn("Ignoring duplicate label key: {} with value: {} from metric: {} ", newLabelName, parts[i + 1], metricName); - } - } - } - } - return builder.build(); + /** + * Remove a Yammer metric from collection. + * + * @param name The name of the Yammer metric to remove. + */ + public void removeMetric(MetricName name) { + metrics.remove(name); } private static Quantiles quantiles(Sampling sampling) { diff --git a/src/main/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporter.java b/src/main/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporter.java index 2247588..64fab67 100644 --- a/src/main/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporter.java +++ b/src/main/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporter.java @@ -4,20 +4,37 @@ */ package io.strimzi.kafka.metrics; +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.Metric; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; +import com.yammer.metrics.core.MetricsRegistryListener; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.prometheus.metrics.exporter.httpserver.HTTPServer; import io.prometheus.metrics.model.registry.PrometheusRegistry; import kafka.metrics.KafkaMetricsReporter; import kafka.utils.VerifiableProperties; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.Optional; + /** * KafkaMetricsReporter to export Kafka broker metrics in the Prometheus format. */ -public class YammerPrometheusMetricsReporter implements KafkaMetricsReporter { +public class YammerPrometheusMetricsReporter implements KafkaMetricsReporter, MetricsRegistryListener { private static final Logger LOG = LoggerFactory.getLogger(YammerPrometheusMetricsReporter.class); private final PrometheusRegistry registry; + @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the init method + private YammerMetricsCollector collector; + @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the init method + private PrometheusMetricsReporterConfig config; + @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method + private Optional httpServer; /** * Constructor @@ -33,9 +50,34 @@ public YammerPrometheusMetricsReporter() { @Override public void init(VerifiableProperties props) { - PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props.props(), registry); - registry.register(new YammerMetricsCollector(config)); + config = new PrometheusMetricsReporterConfig(props.props(), registry); + collector = new YammerMetricsCollector(); + registry.register(collector); + for (MetricsRegistry yammerRegistry : Arrays.asList(KafkaYammerMetrics.defaultRegistry(), Metrics.defaultRegistry())) { + yammerRegistry.addListener(this); + } + httpServer = config.startHttpServer(); LOG.debug("YammerPrometheusMetricsReporter configured with {}", config); } + @Override + public void onMetricAdded(MetricName name, Metric metric) { + String prometheusName = MetricWrapper.prometheusName(name); + if (!config.isAllowed(prometheusName)) { + LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusName); + } else { + MetricWrapper metricWrapper = new MetricWrapper(prometheusName, name.getScope(), metric, name.getName()); + collector.addMetric(name, metricWrapper); + } + } + + @Override + public void onMetricRemoved(MetricName name) { + collector.removeMetric(name); + } + + // for testing + Optional getPort() { + return Optional.ofNullable(httpServer.isPresent() ? httpServer.get().getPort() : null); + } } diff --git a/src/test/java/io/strimzi/kafka/metrics/KafkaMetricsCollectorTest.java b/src/test/java/io/strimzi/kafka/metrics/KafkaMetricsCollectorTest.java index 6486ba4..45cb29d 100644 --- a/src/test/java/io/strimzi/kafka/metrics/KafkaMetricsCollectorTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/KafkaMetricsCollectorTest.java @@ -20,7 +20,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -123,16 +122,6 @@ public void testLabelsFromTags() { assertEquals(1, labels.size()); } - @Test - public void testMetricName() { - PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(Collections.emptyMap(), new PrometheusRegistry()); - KafkaMetricsCollector collector = new KafkaMetricsCollector(config); - collector.setPrefix("kafka.server"); - - String metricName = collector.metricName(new MetricName("NaMe", "KafKa.neTwork", "", Collections.emptyMap())); - assertEquals("kafka_server_kafka_network_name", metricName); - } - private void assertGaugeSnapshot(MetricSnapshot snapshot, double expectedValue, Labels expectedLabels) { assertInstanceOf(GaugeSnapshot.class, snapshot); GaugeSnapshot gaugeSnapshot = (GaugeSnapshot) snapshot; diff --git a/src/test/java/io/strimzi/kafka/metrics/MetricWrapperTest.java b/src/test/java/io/strimzi/kafka/metrics/MetricWrapperTest.java new file mode 100644 index 0000000..f4b4ee4 --- /dev/null +++ b/src/test/java/io/strimzi/kafka/metrics/MetricWrapperTest.java @@ -0,0 +1,51 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.metrics; + +import com.yammer.metrics.core.MetricName; +import io.prometheus.metrics.model.snapshots.Labels; +import io.prometheus.metrics.model.snapshots.PrometheusNaming; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class MetricWrapperTest { + + @Test + public void testLabelsFromScope() { + MetricWrapper mw = new MetricWrapper("", "k1.v1.k2.v2", null, ""); + assertEquals(Labels.of("k1", "v1", "k2", "v2"), mw.labels()); + mw = new MetricWrapper("", "k1.v1.k2.v2.", null, ""); + assertEquals(Labels.of("k1", "v1", "k2", "v2"), mw.labels()); + mw = new MetricWrapper("", null, null, ""); + assertEquals(Labels.EMPTY, mw.labels()); + mw = new MetricWrapper("", "k1", null, ""); + assertEquals(Labels.EMPTY, mw.labels()); + mw = new MetricWrapper("", "k1.", null, ""); + assertEquals(Labels.EMPTY, mw.labels()); + mw = new MetricWrapper("", "k1.v1.k", null, ""); + assertEquals(Labels.EMPTY, mw.labels()); + + mw = new MetricWrapper("", "k-1.v1.k_1.v2", null, ""); + Labels labels = mw.labels(); + assertEquals("k_1", PrometheusNaming.sanitizeLabelName("k-1")); + assertEquals("v1", labels.get("k_1")); + assertEquals(1, labels.size()); + } + + @Test + public void testYammerMetricName() { + String metricName = MetricWrapper.prometheusName(new MetricName("Kafka.Server", "Log", "NumLogSegments")); + assertEquals("kafka_server_kafka_server_log_numlogsegments", metricName); + } + + @Test + public void testKafkaMetricName() { + String metricName = MetricWrapper.prometheusName("kafka_server", new org.apache.kafka.common.MetricName("NaMe", "KafKa.neTwork", "", Collections.emptyMap())); + assertEquals("kafka_server_kafka_network_name", metricName); + } +} diff --git a/src/test/java/io/strimzi/kafka/metrics/YammerMetricsCollectorTest.java b/src/test/java/io/strimzi/kafka/metrics/YammerMetricsCollectorTest.java index bf3cba6..e1d6709 100644 --- a/src/test/java/io/strimzi/kafka/metrics/YammerMetricsCollectorTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/YammerMetricsCollectorTest.java @@ -4,22 +4,20 @@ */ package io.strimzi.kafka.metrics; +import com.yammer.metrics.Metrics; import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.Metric; import com.yammer.metrics.core.MetricName; -import io.prometheus.metrics.model.registry.PrometheusRegistry; +import com.yammer.metrics.core.MetricsRegistry; import io.prometheus.metrics.model.snapshots.CounterSnapshot; import io.prometheus.metrics.model.snapshots.InfoSnapshot; import io.prometheus.metrics.model.snapshots.Labels; import io.prometheus.metrics.model.snapshots.MetricSnapshot; import io.prometheus.metrics.model.snapshots.MetricSnapshots; -import io.prometheus.metrics.model.snapshots.PrometheusNaming; -import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -27,44 +25,40 @@ public class YammerMetricsCollectorTest { - private LinkedHashMap tagsMap; + private final MetricsRegistry registry = Metrics.defaultRegistry(); + private String scope; private Labels labels; @BeforeEach public void setup() { - tagsMap = new LinkedHashMap<>(); - tagsMap.put("k1", "v1"); - tagsMap.put("k2", "v2"); Labels.Builder labelsBuilder = Labels.builder(); - for (Map.Entry tag : tagsMap.entrySet()) { - labelsBuilder.label(tag.getKey(), tag.getValue()); + scope = ""; + for (int i = 0; i < 2; i++) { + labelsBuilder.label("k" + i, "v" + i); + scope += "k" + i + ".v" + i + "."; } labels = labelsBuilder.build(); + for (Map.Entry entry : registry.allMetrics().entrySet()) { + registry.removeMetric(entry.getKey()); + } } @Test - public void testMetricLifeCycle() { - Map props = new HashMap<>(); - props.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_type.*"); - PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props, new PrometheusRegistry()); - YammerMetricsCollector collector = new YammerMetricsCollector(config); + public void testCollect() { + YammerMetricsCollector collector = new YammerMetricsCollector(); MetricSnapshots metrics = collector.collect(); assertEquals(0, metrics.size()); - // Adding a metric not matching the allowlist does nothing - newCounter("other", "type", "name"); - metrics = collector.collect(); - assertEquals(0, metrics.size()); - - // Adding a metric that matches the allowlist - Counter counter = newCounter("group", "type", "name"); + // Add a metric + MetricName metricName = new MetricName("group", "type", "name", scope); + MetricWrapper metricWrapper = newMetric(metricName); + collector.addMetric(metricName, metricWrapper); metrics = collector.collect(); assertEquals(1, metrics.size()); MetricSnapshot snapshot = metrics.get(0); - - //assertEquals("kafka_server_group_name_type_count", snapshot.getMetadata().getName()); + assertEquals(metricWrapper.prometheusName(), snapshot.getMetadata().getName()); assertInstanceOf(CounterSnapshot.class, snapshot); CounterSnapshot counterSnapshot = (CounterSnapshot) snapshot; @@ -73,85 +67,62 @@ public void testMetricLifeCycle() { assertEquals(0.0, datapoint.getValue(), 0.1); assertEquals(labels, datapoint.getLabels()); - // Updating the value of the metric - counter.inc(10); + // Update the value of the metric + ((Counter) metricWrapper.value()).inc(10); metrics = collector.collect(); assertEquals(1, metrics.size()); snapshot = metrics.get(0); - //assertEquals("kafka_server_group_name_type_count", snapshot.getMetadata().getName()); + assertEquals(metricWrapper.prometheusName(), snapshot.getMetadata().getName()); assertInstanceOf(CounterSnapshot.class, snapshot); counterSnapshot = (CounterSnapshot) snapshot; assertEquals(1, counterSnapshot.getDataPoints().size()); datapoint = counterSnapshot.getDataPoints().get(0); assertEquals(10.0, datapoint.getValue(), 0.1); - // Removing the metric - removeMetric("group", "type", "name"); + // Remove the metric + collector.removeMetric(metricName); metrics = collector.collect(); assertEquals(0, metrics.size()); } @Test public void testCollectNonNumericMetric() { - Map props = new HashMap<>(); - props.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_type.*"); - PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props, new PrometheusRegistry()); - YammerMetricsCollector collector = new YammerMetricsCollector(config); + YammerMetricsCollector collector = new YammerMetricsCollector(); MetricSnapshots metrics = collector.collect(); assertEquals(0, metrics.size()); String nonNumericValue = "value"; - newNonNumericGauge("group", "type", "name", nonNumericValue); + MetricName metricName = new MetricName("group", "type", "name", scope); + MetricWrapper metricWrapper = newNonNumericMetric(metricName, nonNumericValue); + collector.addMetric(metricName, metricWrapper); metrics = collector.collect(); assertEquals(1, metrics.size()); MetricSnapshot snapshot = metrics.get(0); + assertEquals(metricWrapper.prometheusName(), snapshot.getMetadata().getName()); assertInstanceOf(InfoSnapshot.class, snapshot); Labels expectedLabels = labels.add("name", nonNumericValue); assertEquals(1, snapshot.getDataPoints().size()); assertEquals(expectedLabels, snapshot.getDataPoints().get(0).getLabels()); } - @Test - public void testLabelsFromScope() { - assertEquals(Labels.of("k1", "v1", "k2", "v2"), YammerMetricsCollector.labelsFromScope("k1.v1.k2.v2", "name")); - assertEquals(Labels.EMPTY, YammerMetricsCollector.labelsFromScope(null, "name")); - assertEquals(Labels.EMPTY, YammerMetricsCollector.labelsFromScope("k1", "name")); - assertEquals(Labels.EMPTY, YammerMetricsCollector.labelsFromScope("k1.", "name")); - assertEquals(Labels.EMPTY, YammerMetricsCollector.labelsFromScope("k1.v1.k", "name")); - - Labels labels = YammerMetricsCollector.labelsFromScope("k-1.v1.k_1.v2", "name"); - assertEquals("k_1", PrometheusNaming.sanitizeLabelName("k-1")); - assertEquals("v1", labels.get("k_1")); - assertEquals(1, labels.size()); + private MetricWrapper newMetric(MetricName metricName) { + Counter counter = registry.newCounter(metricName); + String prometheusName = MetricWrapper.prometheusName(metricName); + return new MetricWrapper(prometheusName, metricName.getScope(), counter, metricName.getName()); } - @Test - public void testMetricName() { - String metricName = YammerMetricsCollector.metricName(new MetricName("Kafka.Server", "Log", "NumLogSegments")); - assertEquals("kafka_server_kafka_server_log_numlogsegments", metricName); - } - - public Counter newCounter(String group, String type, String name) { - MetricName metricName = KafkaYammerMetrics.getMetricName(group, type, name, tagsMap); - return KafkaYammerMetrics.defaultRegistry().newCounter(metricName); - } - - public void newNonNumericGauge(String group, String type, String name, String value) { - MetricName metricName = KafkaYammerMetrics.getMetricName(group, type, name, tagsMap); - KafkaYammerMetrics.defaultRegistry().newGauge(metricName, new Gauge() { + private MetricWrapper newNonNumericMetric(MetricName metricName, String value) { + Gauge gauge = registry.newGauge(metricName, new Gauge<>() { @Override public String value() { return value; } }); - } - - public void removeMetric(String group, String type, String name) { - MetricName metricName = KafkaYammerMetrics.getMetricName(group, type, name, tagsMap); - KafkaYammerMetrics.defaultRegistry().removeMetric(metricName); + String prometheusName = MetricWrapper.prometheusName(metricName); + return new MetricWrapper(prometheusName, metricName.getScope(), gauge, metricName.getName()); } } diff --git a/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java b/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java index acc7b82..25c8e1e 100644 --- a/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java @@ -4,20 +4,93 @@ */ package io.strimzi.kafka.metrics; +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Metric; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; import io.prometheus.metrics.model.registry.PrometheusRegistry; import kafka.utils.VerifiableProperties; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class YammerPrometheusMetricsReporterTest { + private final MetricsRegistry registry = Metrics.defaultRegistry(); + + @BeforeEach + public void setup() { + for (Map.Entry entry : registry.allMetrics().entrySet()) { + registry.removeMetric(entry.getKey()); + } + } + @Test - public void testLifeCycle() { + public void testLifeCycle() throws Exception { YammerPrometheusMetricsReporter reporter = new YammerPrometheusMetricsReporter(new PrometheusRegistry()); Properties configs = new Properties(); - configs.put("broker.id", "0"); configs.put(PrometheusMetricsReporterConfig.LISTENER_CONFIG, "http://:0"); + configs.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_type.*"); reporter.init(new VerifiableProperties(configs)); + + Optional port = reporter.getPort(); + assertTrue(port.isPresent()); + assertEquals(0, getMetrics(port.get()).size()); + + // Adding a metric not matching the allowlist does nothing + newCounter("other", "type", "name"); + List metrics = getMetrics(port.get()); + assertEquals(0, metrics.size()); + + // Adding a metric that matches the allowlist + newCounter("group", "type", "name"); + metrics = getMetrics(port.get()); + assertEquals(1, metrics.size()); + assertEquals("kafka_server_group_type_name_total 0.0", metrics.get(0)); + + // Removing the metric + removeMetric("group", "type", "name"); + metrics = getMetrics(port.get()); + assertEquals(0, metrics.size()); + } + + private List getMetrics(int port) throws Exception { + List metrics = new ArrayList<>(); + URL url = new URL("http://localhost:" + port + "/metrics"); + HttpURLConnection con = (HttpURLConnection) url.openConnection(); + con.setRequestMethod("GET"); + + try (BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()))) { + String inputLine; + while ((inputLine = in.readLine()) != null) { + if (!inputLine.startsWith("#")) { + metrics.add(inputLine); + } + } + } + return metrics; + } + + private Counter newCounter(String group, String type, String name) { + MetricName metricName = new MetricName(group, type, name, ""); + return registry.newCounter(metricName); + } + + private void removeMetric(String group, String type, String name) { + MetricName metricName = new MetricName(group, type, name, ""); + registry.removeMetric(metricName); } }