Skip to content

Commit

Permalink
move label filtering into Reporter
Browse files Browse the repository at this point in the history
Signed-off-by: Owen <[email protected]>
  • Loading branch information
OwenCorrigan76 committed Aug 13, 2024
1 parent 586860c commit bf86d91
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 98 deletions.
67 changes: 26 additions & 41 deletions src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand All @@ -31,19 +29,14 @@
public class KafkaMetricsCollector implements MultiCollector {

private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsCollector.class);

private final Map<MetricName, KafkaMetric> metrics;
private final PrometheusMetricsReporterConfig config;
private final Map<MetricName, MetricWrapper> metrics;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the setPrefix method
private String prefix;

/**
* Constructs a new KafkaMetricsCollector with provided configuration.
*
* @param config The configuration for the PrometheusMetricsReporter.
*/
public KafkaMetricsCollector(PrometheusMetricsReporterConfig config) {
this.config = config;
public KafkaMetricsCollector() {
this.metrics = new ConcurrentHashMap<>();
}

Expand All @@ -57,12 +50,24 @@ public void setPrefix(String prefix) {
}

/**
* Adds a Kafka metric to be collected.
* This method is used to get the prefix that is used for metric names.
*
* @return The prefix used for metric names.
*/
public String getPrefix() {
return prefix;
}

/**
* This method is used to add a Kafka metric to the collection for reporting.
* The metric is wrapped in a MetricWrapper object which contains additional information
* such as the prometheus name of the metric.
*
* @param metric The Kafka metric to add.
* @param name The name of the metric in the Kafka system. This is used as the key in the metrics map.
* @param metric The Kafka metric to add. This is wrapped in a MetricWrapper object.
*/
public void addMetric(KafkaMetric metric) {
metrics.put(metric.metricName(), metric);
public void addMetric(MetricName name, MetricWrapper metric) {
metrics.put(name, metric);
}

/**
Expand All @@ -83,26 +88,20 @@ public MetricSnapshots collect() {
Map<String, GaugeSnapshot.Builder> gaugeBuilders = new HashMap<>();
Map<String, InfoSnapshot.Builder> infoBuilders = new HashMap<>();

for (Map.Entry<MetricName, KafkaMetric> entry : metrics.entrySet()) {
MetricName metricName = entry.getKey();
KafkaMetric kafkaMetric = entry.getValue();

String prometheusMetricName = MetricWrapper.prometheusName(prefix, metricName);
if (!config.isAllowed(prometheusMetricName)) {
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusMetricName);
continue;
}
Labels labels = labelsFromTags(metricName.tags(), metricName.name());
for (Map.Entry<MetricName, MetricWrapper> entry : metrics.entrySet()) {
MetricWrapper metricWrapper = entry.getValue();
String prometheusMetricName = metricWrapper.prometheusName();
Object metric = metricWrapper.value();
Labels labels = metricWrapper.labels();
LOG.debug("Collecting metric {} with the following labels: {}", prometheusMetricName, labels);

Object valueObj = kafkaMetric.metricValue();
if (valueObj instanceof Number) {
double value = ((Number) valueObj).doubleValue();
if (metric instanceof Number) {
double value = ((Number) metric).doubleValue();
GaugeSnapshot.Builder builder = gaugeBuilders.computeIfAbsent(prometheusMetricName, k -> GaugeSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.gaugeDataPoint(labels, value));
} else {
InfoSnapshot.Builder builder = infoBuilders.computeIfAbsent(prometheusMetricName, k -> InfoSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, valueObj, metricName.name()));
builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, metric, metricWrapper.attribute()));
}
}
List<MetricSnapshot> snapshots = new ArrayList<>();
Expand All @@ -114,18 +113,4 @@ public MetricSnapshots collect() {
}
return new MetricSnapshots(snapshots);
}

static Labels labelsFromTags(Map<String, String> tags, String metricName) {
Labels.Builder builder = Labels.builder();
Set<String> labelNames = new HashSet<>();
for (Map.Entry<String, String> label : tags.entrySet()) {
String newLabelName = PrometheusNaming.sanitizeLabelName(label.getKey());
if (labelNames.add(newLabelName)) {
builder.label(newLabelName, label.getValue());
} else {
LOG.warn("Ignoring duplicate label key: {} with value: {} from metric: {} ", newLabelName, label.getValue(), metricName);
}
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
public class KafkaPrometheusMetricsReporter implements MetricsReporter {

private static final Logger LOG = LoggerFactory.getLogger(KafkaPrometheusMetricsReporter.class);

private final PrometheusRegistry registry;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private KafkaMetricsCollector kafkaMetricsCollector;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"})
// This field is initialized in the configure method
private KafkaMetricsCollector collector;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the init method
private PrometheusMetricsReporterConfig config;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"})
// This field is initialized in the configure method
private Optional<HTTPServer> httpServer;

/**
Expand All @@ -50,35 +53,44 @@ public KafkaPrometheusMetricsReporter() {

@Override
public void configure(Map<String, ?> map) {
PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(map, registry);
kafkaMetricsCollector = new KafkaMetricsCollector(config);
config = new PrometheusMetricsReporterConfig(map, registry);
collector = new KafkaMetricsCollector();
// Add JVM metrics
JvmMetrics.builder().register(registry);
httpServer = config.startHttpServer();
LOG.debug("KafkaPrometheusMetricsReporter configured with {}", config);
}

// why does this take a list of metrics. Is this starting after metrics have been produced???
// Where is the listener happening on this side: in register(collector)??
// The tests are catching the filtering but should there be more robust testing??
@Override
public void init(List<KafkaMetric> metrics) {
registry.register(kafkaMetricsCollector);
registry.register(collector);
for (KafkaMetric metric : metrics) {
metricChange(metric);
}
}

@Override
public void metricChange(KafkaMetric metric) {
kafkaMetricsCollector.addMetric(metric);
String prometheusName = MetricWrapper.prometheusName(collector.getPrefix(), metric.metricName());
if (!config.isAllowed(prometheusName)) {
System.out.println("Is allowed has failed for: " + prometheusName);
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusName);
} else {
MetricWrapper metricWrapper = new MetricWrapper(prometheusName, metric, metric.metricName().name());
collector.addMetric(metric.metricName(), metricWrapper);
}
}

@Override
public void metricRemoval(KafkaMetric metric) {
kafkaMetricsCollector.removeMetric(metric);
collector.removeMetric(metric);
}

@Override
public void close() {
registry.unregister(kafkaMetricsCollector);
registry.unregister(collector);
}

@Override
Expand All @@ -97,7 +109,7 @@ public Set<String> reconfigurableConfigs() {
@Override
public void contextChange(MetricsContext metricsContext) {
String prefix = metricsContext.contextLabels().get(MetricsContext.NAMESPACE);
kafkaMetricsCollector.setPrefix(prefix);
collector.setPrefix(prefix);
}

// for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
*/
package io.strimzi.kafka.metrics;

import io.prometheus.metrics.model.registry.PrometheusRegistry;
import io.prometheus.metrics.model.snapshots.GaugeSnapshot;
import io.prometheus.metrics.model.snapshots.InfoSnapshot;
import io.prometheus.metrics.model.snapshots.Labels;
import io.prometheus.metrics.model.snapshots.MetricSnapshot;
import io.prometheus.metrics.model.snapshots.MetricSnapshots;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.KafkaMetric;
Expand All @@ -20,15 +18,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;

public class KafkaMetricsCollectorTest {

private final MetricConfig metricConfig = new MetricConfig();
private final Time time = Time.SYSTEM;
private Map<String, String> tagsMap;
Expand All @@ -47,58 +43,50 @@ public void setup() {
}

@Test
public void testMetricLifecycle() {
Map<String, String> props = new HashMap<>();
props.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_name.*");
PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props, new PrometheusRegistry());
KafkaMetricsCollector collector = new KafkaMetricsCollector(config);
collector.setPrefix("kafka.server");
public void testCollect() {
KafkaMetricsCollector collector = new KafkaMetricsCollector();

MetricSnapshots metrics = collector.collect();
assertEquals(0, metrics.size());

// Adding a metric not matching the allowlist does nothing
collector.addMetric(buildMetric("name", "other", 2.0));
metrics = collector.collect();
assertEquals(0, metrics.size());
// Add a metric
MetricName metricName = new MetricName("name", "group", "description", tagsMap);
MetricWrapper metricWrapper = newMetric(metricName, 1.0);

// Adding a metric that matches the allowlist
collector.addMetric(buildMetric("name", "group", 1.0));
collector.addMetric(metricName, metricWrapper);
metrics = collector.collect();
assertEquals(1, metrics.size());

MetricSnapshot snapshot = metrics.get(0);
assertGaugeSnapshot(snapshot, 1.0, labels);

// Adding the same metric updates its value
collector.addMetric(buildMetric("name", "group", 3.0));
// Update the value of the metric
collector.addMetric(metricName, newMetric(metricName, 3.0));
metrics = collector.collect();
assertEquals(1, metrics.size());

MetricSnapshot updatedSnapshot = metrics.get(0);
assertGaugeSnapshot(updatedSnapshot, 3.0, labels);

// Removing the metric
collector.removeMetric(buildMetric("name", "group", 4.0));
collector.removeMetric(buildMetric("name", "group", 1.0));
metrics = collector.collect();
assertEquals(0, metrics.size());
}

@Test
public void testCollectNonNumericMetric() {
Map<String, String> props = new HashMap<>();
props.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_name.*");
PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props, new PrometheusRegistry());
KafkaMetricsCollector collector = new KafkaMetricsCollector(config);
KafkaMetricsCollector collector = new KafkaMetricsCollector();
collector.setPrefix("kafka.server");

MetricSnapshots metrics = collector.collect();
assertEquals(0, metrics.size());

// Adding a non-numeric metric converted
String nonNumericValue = "myValue";
KafkaMetric nonNumericMetric = buildNonNumericMetric("name", "group", nonNumericValue);
collector.addMetric(nonNumericMetric);
MetricName metricName = new MetricName("name", "group", "description", tagsMap);
MetricWrapper metricWrapper = newNonNumericMetric(metricName, nonNumericValue);
collector.addMetric(metricName, metricWrapper);
metrics = collector.collect();

assertEquals(1, metrics.size());
Expand All @@ -109,19 +97,6 @@ public void testCollectNonNumericMetric() {
assertEquals(expectedLabels, snapshot.getDataPoints().get(0).getLabels());
}

@Test
public void testLabelsFromTags() {
Map<String, String> tags = new LinkedHashMap<>();
tags.put("k-1", "v1");
tags.put("k_1", "v2");

Labels labels = KafkaMetricsCollector.labelsFromTags(tags, "name");

assertEquals("k_1", PrometheusNaming.sanitizeLabelName("k-1"));
assertEquals("v1", labels.get("k_1"));
assertEquals(1, labels.size());
}

private void assertGaugeSnapshot(MetricSnapshot snapshot, double expectedValue, Labels expectedLabels) {
assertInstanceOf(GaugeSnapshot.class, snapshot);
GaugeSnapshot gaugeSnapshot = (GaugeSnapshot) snapshot;
Expand All @@ -131,6 +106,12 @@ private void assertGaugeSnapshot(MetricSnapshot snapshot, double expectedValue,
assertEquals(expectedLabels, datapoint.getLabels());
}

private MetricWrapper newMetric(MetricName metricName, double value) {
KafkaMetric metric = buildMetric(metricName.name(), metricName.group(), value);
String prometheusName = MetricWrapper.prometheusName("kafka.server", metricName);
return new MetricWrapper(prometheusName, metric, metricName.name());
}

private KafkaMetric buildMetric(String name, String group, double value) {
Measurable measurable = (config, now) -> value;
return new KafkaMetric(
Expand All @@ -141,14 +122,17 @@ private KafkaMetric buildMetric(String name, String group, double value) {
time);
}

private KafkaMetric buildNonNumericMetric(String name, String group, String value) {
Gauge<String> measurable = (config, now) -> value;
return new KafkaMetric(
private MetricWrapper newNonNumericMetric(MetricName metricName, String value) {
Gauge<String> gauge = (config, now) -> value;
KafkaMetric kafkaMetric = new KafkaMetric(
new Object(),
new MetricName(name, group, "", tagsMap),
measurable,
metricName,
gauge,
metricConfig,
time);
time
);
String prometheusName = MetricWrapper.prometheusName("kafka.server", metricName);
return new MetricWrapper(prometheusName, kafkaMetric, metricName.name());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ public void testLifeCycle() throws Exception {
KafkaPrometheusMetricsReporter reporter = new KafkaPrometheusMetricsReporter(new PrometheusRegistry());
Map<String, String> configs = new HashMap<>();
configs.put(PrometheusMetricsReporterConfig.LISTENER_CONFIG, "http://:0");
configs.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_name.*");
reporter.configure(configs);
reporter.contextChange(new KafkaMetricsContext("kafka.server"));

Optional<Integer> port = reporter.getPort();
assertTrue(port.isPresent());
int initialMetrics = getMetrics(port.get()).size();

int initialMetrics = getMetrics(port.get()).size();
KafkaMetric metric1 = buildMetric("name1", "group", 0);
reporter.init(Collections.singletonList(metric1));

Expand Down Expand Up @@ -71,6 +73,7 @@ public void testLifeCycle() throws Exception {
public void testMultipleReporters() throws Exception {
Map<String, String> configs = new HashMap<>();
configs.put(PrometheusMetricsReporterConfig.LISTENER_CONFIG, "http://:0");
configs.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_name.*");

KafkaPrometheusMetricsReporter reporter1 = new KafkaPrometheusMetricsReporter(new PrometheusRegistry());
reporter1.configure(configs);
Expand Down

0 comments on commit bf86d91

Please sign in to comment.