Skip to content

Commit

Permalink
Move metric filtering to KafkaPrometheusReporter (#38)
Browse files Browse the repository at this point in the history
Signed-off-by: Owen <[email protected]>
  • Loading branch information
OwenCorrigan76 committed Aug 16, 2024
1 parent b572d91 commit dfcde19
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 152 deletions.
80 changes: 21 additions & 59 deletions src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,20 @@
*/
package io.strimzi.kafka.metrics;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.metrics.model.registry.MultiCollector;
import io.prometheus.metrics.model.snapshots.GaugeSnapshot;
import io.prometheus.metrics.model.snapshots.InfoSnapshot;
import io.prometheus.metrics.model.snapshots.Labels;
import io.prometheus.metrics.model.snapshots.MetricSnapshot;
import io.prometheus.metrics.model.snapshots.MetricSnapshots;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand All @@ -31,78 +26,59 @@
public class KafkaMetricsCollector implements MultiCollector {

private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsCollector.class);

private final Map<MetricName, KafkaMetric> metrics;
private final PrometheusMetricsReporterConfig config;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the setPrefix method
private String prefix;
private final Map<MetricName, MetricWrapper> metrics;

/**
* Constructs a new KafkaMetricsCollector with provided configuration.
*
* @param config The configuration for the PrometheusMetricsReporter.
*/
public KafkaMetricsCollector(PrometheusMetricsReporterConfig config) {
this.config = config;
public KafkaMetricsCollector() {
this.metrics = new ConcurrentHashMap<>();
}

/**
* Sets the prefix to be used for metric names. This is always called before addMetric/removeMetric
*
* @param prefix The prefix to set.
*/
public void setPrefix(String prefix) {
this.prefix = PrometheusNaming.prometheusName(prefix);
}

/**
* Adds a Kafka metric to be collected.
* This method is used to add a Kafka metric to the collection for reporting.
* The metric is wrapped in a MetricWrapper object which contains additional information
* such as the prometheus name of the metric.
*
* @param metric The Kafka metric to add.
* @param name The name of the metric in the Kafka system. This is used as the key in the metrics map.
* @param metric The Kafka metric to add. This is wrapped in a MetricWrapper object.
*/
public void addMetric(KafkaMetric metric) {
metrics.put(metric.metricName(), metric);
public void addMetric(MetricName name, MetricWrapper metric) {
metrics.put(name, metric);
}

/**
* Removes a Kafka metric from collection.
*
* @param metric The Kafka metric to remove.
* @param name The Kafka metric to remove.
*/
public void removeMetric(KafkaMetric metric) {
metrics.remove(metric.metricName());
public void removeMetric(MetricName name) {
metrics.remove(name);
}

/**
* Called when the Prometheus server scrapes metrics.
* @return metrics that match the configured allowlist
* @return MetricSnapshots object that contains snapshots of metrics
*/
@Override
public MetricSnapshots collect() {
Map<String, GaugeSnapshot.Builder> gaugeBuilders = new HashMap<>();
Map<String, InfoSnapshot.Builder> infoBuilders = new HashMap<>();

for (Map.Entry<MetricName, KafkaMetric> entry : metrics.entrySet()) {
MetricName metricName = entry.getKey();
KafkaMetric kafkaMetric = entry.getValue();

String prometheusMetricName = MetricWrapper.prometheusName(prefix, metricName);
if (!config.isAllowed(prometheusMetricName)) {
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusMetricName);
continue;
}
Labels labels = labelsFromTags(metricName.tags(), metricName.name());
for (Map.Entry<MetricName, MetricWrapper> entry : metrics.entrySet()) {
MetricWrapper metricWrapper = entry.getValue();
String prometheusMetricName = metricWrapper.prometheusName();
Object metric = metricWrapper.value();
Labels labels = metricWrapper.labels();
LOG.debug("Collecting metric {} with the following labels: {}", prometheusMetricName, labels);

Object valueObj = kafkaMetric.metricValue();
if (valueObj instanceof Number) {
double value = ((Number) valueObj).doubleValue();
if (metric instanceof Number) {
double value = ((Number) metric).doubleValue();
GaugeSnapshot.Builder builder = gaugeBuilders.computeIfAbsent(prometheusMetricName, k -> GaugeSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.gaugeDataPoint(labels, value));
} else {
InfoSnapshot.Builder builder = infoBuilders.computeIfAbsent(prometheusMetricName, k -> InfoSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, valueObj, metricName.name()));
builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, metric, metricWrapper.attribute()));
}
}
List<MetricSnapshot> snapshots = new ArrayList<>();
Expand All @@ -114,18 +90,4 @@ public MetricSnapshots collect() {
}
return new MetricSnapshots(snapshots);
}

static Labels labelsFromTags(Map<String, String> tags, String metricName) {
Labels.Builder builder = Labels.builder();
Set<String> labelNames = new HashSet<>();
for (Map.Entry<String, String> label : tags.entrySet()) {
String newLabelName = PrometheusNaming.sanitizeLabelName(label.getKey());
if (labelNames.add(newLabelName)) {
builder.label(newLabelName, label.getValue());
} else {
LOG.warn("Ignoring duplicate label key: {} with value: {} from metric: {} ", newLabelName, label.getValue(), metricName);
}
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.prometheus.metrics.exporter.httpserver.HTTPServer;
import io.prometheus.metrics.instrumentation.jvm.JvmMetrics;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsContext;
Expand All @@ -29,12 +30,15 @@
public class KafkaPrometheusMetricsReporter implements MetricsReporter {

private static final Logger LOG = LoggerFactory.getLogger(KafkaPrometheusMetricsReporter.class);

private final PrometheusRegistry registry;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private KafkaMetricsCollector kafkaMetricsCollector;
private KafkaMetricsCollector collector;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private PrometheusMetricsReporterConfig config;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private Optional<HTTPServer> httpServer;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the contextChange method
private String prefix;

/**
* Constructor
Expand All @@ -50,8 +54,8 @@ public KafkaPrometheusMetricsReporter() {

@Override
public void configure(Map<String, ?> map) {
PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(map, registry);
kafkaMetricsCollector = new KafkaMetricsCollector(config);
config = new PrometheusMetricsReporterConfig(map, registry);
collector = new KafkaMetricsCollector();
// Add JVM metrics
JvmMetrics.builder().register(registry);
httpServer = config.startHttpServer();
Expand All @@ -60,25 +64,30 @@ public void configure(Map<String, ?> map) {

@Override
public void init(List<KafkaMetric> metrics) {
registry.register(kafkaMetricsCollector);
registry.register(collector);
for (KafkaMetric metric : metrics) {
metricChange(metric);
}
}

@Override
public void metricChange(KafkaMetric metric) {
kafkaMetricsCollector.addMetric(metric);
String prometheusName = MetricWrapper.prometheusName(prefix, metric.metricName());
if (!config.isAllowed(prometheusName)) {
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusName);
} else {
MetricWrapper metricWrapper = new MetricWrapper(prometheusName, metric, metric.metricName().name());
collector.addMetric(metric.metricName(), metricWrapper);
}
}

@Override
public void metricRemoval(KafkaMetric metric) {
kafkaMetricsCollector.removeMetric(metric);
collector.removeMetric(metric.metricName());
}

@Override
public void close() {
registry.unregister(kafkaMetricsCollector);
registry.unregister(collector);
}

@Override
Expand All @@ -97,7 +106,7 @@ public Set<String> reconfigurableConfigs() {
@Override
public void contextChange(MetricsContext metricsContext) {
String prefix = metricsContext.contextLabels().get(MetricsContext.NAMESPACE);
kafkaMetricsCollector.setPrefix(prefix);
this.prefix = PrometheusNaming.prometheusName(prefix);
}

// for testing
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/io/strimzi/kafka/metrics/MetricWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class MetricWrapper {
private final Object value;
private final String attribute;

// Will be used when implementing https://github.com/strimzi/metrics-reporter/issues/9
/**
* Constructor from Kafka Metrics
* @param prometheusName The name of the metric in the prometheus format
Expand Down Expand Up @@ -89,7 +88,7 @@ public String attribute() {
return attribute;
}

private static Labels labelsFromScope(String scope, String metricName) {
static Labels labelsFromScope(String scope, String metricName) {
Labels.Builder builder = Labels.builder();
Set<String> labelNames = new HashSet<>();
if (scope != null) {
Expand All @@ -108,8 +107,7 @@ private static Labels labelsFromScope(String scope, String metricName) {
return builder.build();
}

// Will be used when implementing https://github.com/strimzi/metrics-reporter/issues/9
private static Labels labelsFromTags(Map<String, String> tags, String metricName) {
static Labels labelsFromTags(Map<String, String> tags, String metricName) {
Labels.Builder builder = Labels.builder();
Set<String> labelNames = new HashSet<>();
for (Map.Entry<String, String> label : tags.entrySet()) {
Expand Down Expand Up @@ -137,7 +135,6 @@ public static String prometheusName(MetricName metricName) {
metricName.getName()).toLowerCase(Locale.ROOT));
}

// Will be used when implementing https://github.com/strimzi/metrics-reporter/issues/9
/**
* Compute the Prometheus name from a Kafka MetricName
* @param prefix The prefix to add to the metric name
Expand Down
Loading

0 comments on commit dfcde19

Please sign in to comment.