Skip to content

Commit

Permalink
Refactor Yammer metrics collection logic (#37)
Browse files Browse the repository at this point in the history
Signed-off-by: Mickael Maison <[email protected]>
  • Loading branch information
mimaison committed Aug 8, 2024
1 parent 0e6da16 commit 586860c
Show file tree
Hide file tree
Showing 8 changed files with 413 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -88,7 +87,7 @@ public MetricSnapshots collect() {
MetricName metricName = entry.getKey();
KafkaMetric kafkaMetric = entry.getValue();

String prometheusMetricName = metricName(metricName);
String prometheusMetricName = MetricWrapper.prometheusName(prefix, metricName);
if (!config.isAllowed(prometheusMetricName)) {
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusMetricName);
continue;
Expand Down Expand Up @@ -116,12 +115,6 @@ public MetricSnapshots collect() {
return new MetricSnapshots(snapshots);
}

String metricName(MetricName metricName) {
return PrometheusNaming.prometheusName(PrometheusNaming
.sanitizeMetricName(prefix + '_' + metricName.group() + '_' + metricName.name())
.toLowerCase(Locale.ROOT));
}

static Labels labelsFromTags(Map<String, String> tags, String metricName) {
Labels.Builder builder = Labels.builder();
Set<String> labelNames = new HashSet<>();
Expand Down
152 changes: 152 additions & 0 deletions src/main/java/io/strimzi/kafka/metrics/MetricWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics;

import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricName;
import io.prometheus.metrics.model.snapshots.Labels;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

/**
* Wrapper for both Kafka and Yammer metrics to unify logic in the Collectors
*/
public class MetricWrapper {

private static final Logger LOG = LoggerFactory.getLogger(MetricWrapper.class);

private final String prometheusName;
private final Labels labels;
private final Object value;
private final String attribute;

// Will be used when implementing https://github.com/strimzi/metrics-reporter/issues/9
/**
* Constructor from Kafka Metrics
* @param prometheusName The name of the metric in the prometheus format
* @param metric The Kafka metric
* @param attribute The attribute of the Kafka metric
*/
public MetricWrapper(String prometheusName, KafkaMetric metric, String attribute) {
this.prometheusName = prometheusName;
this.labels = labelsFromTags(metric.metricName().tags(), prometheusName);
this.value = metric.metricValue();
this.attribute = attribute;
}

/**
* Constructor from Yammer Metrics
* @param prometheusName The name of the metric in the prometheus format
* @param scope The scope of the Yammer metric
* @param metric The Yammer metric
* @param attribute The attribute of the Yammer metric
*/
public MetricWrapper(String prometheusName, String scope, Metric metric, String attribute) {
this.prometheusName = prometheusName;
this.labels = labelsFromScope(scope, prometheusName);
this.value = metric;
this.attribute = attribute;
}

/**
* The Prometheus name of this metric
* @return The Prometheus name
*/
public String prometheusName() {
return prometheusName;
}

/**
* The labels associated with this metric
* @return The labels
*/
public Labels labels() {
return labels;
}

/**
* The metric value
* @return The value
*/
public Object value() {
return value;
}

/**
* The metric attribute
* @return The attribute
*/
public String attribute() {
return attribute;
}

private static Labels labelsFromScope(String scope, String metricName) {
Labels.Builder builder = Labels.builder();
Set<String> labelNames = new HashSet<>();
if (scope != null) {
String[] parts = scope.split("\\.");
if (parts.length % 2 == 0) {
for (int i = 0; i < parts.length; i += 2) {
String newLabelName = PrometheusNaming.sanitizeLabelName(parts[i]);
if (labelNames.add(newLabelName)) {
builder.label(newLabelName, parts[i + 1]);
} else {
LOG.warn("Ignoring duplicate label key: {} with value: {} from metric: {} ", newLabelName, parts[i + 1], metricName);
}
}
}
}
return builder.build();
}

// Will be used when implementing https://github.com/strimzi/metrics-reporter/issues/9
private static Labels labelsFromTags(Map<String, String> tags, String metricName) {
Labels.Builder builder = Labels.builder();
Set<String> labelNames = new HashSet<>();
for (Map.Entry<String, String> label : tags.entrySet()) {
String newLabelName = PrometheusNaming.sanitizeLabelName(label.getKey());
if (labelNames.add(newLabelName)) {
builder.label(newLabelName, label.getValue());
} else {
LOG.warn("Ignoring duplicate label key: {} with value: {} from metric: {} ", newLabelName, label.getValue(), metricName);
}
}
return builder.build();
}

/**
* Compute the Prometheus name from a Yammer MetricName
* @param metricName The Yammer metric name
* @return The prometheus metric name
*/
public static String prometheusName(MetricName metricName) {
return PrometheusNaming.prometheusName(
PrometheusNaming.sanitizeMetricName(
"kafka_server_" +
metricName.getGroup() + '_' +
metricName.getType() + '_' +
metricName.getName()).toLowerCase(Locale.ROOT));
}

// Will be used when implementing https://github.com/strimzi/metrics-reporter/issues/9
/**
* Compute the Prometheus name from a Kafka MetricName
* @param prefix The prefix to add to the metric name
* @param metricName The Kafka metric name
* @return The prometheus metric name
*/
public static String prometheusName(String prefix, org.apache.kafka.common.MetricName metricName) {
return PrometheusNaming.prometheusName(
PrometheusNaming.sanitizeMetricName(
prefix + '_' + metricName.group() + '_' + metricName.name()).toLowerCase(Locale.ROOT));
}
}
130 changes: 52 additions & 78 deletions src/main/java/io/strimzi/kafka/metrics/YammerMetricsCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@
*/
package io.strimzi.kafka.metrics;

import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.core.Sampling;
import com.yammer.metrics.core.Timer;
import io.prometheus.metrics.model.registry.MultiCollector;
Expand All @@ -21,22 +18,18 @@
import io.prometheus.metrics.model.snapshots.Labels;
import io.prometheus.metrics.model.snapshots.MetricSnapshot;
import io.prometheus.metrics.model.snapshots.MetricSnapshots;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import io.prometheus.metrics.model.snapshots.Quantile;
import io.prometheus.metrics.model.snapshots.Quantiles;
import io.prometheus.metrics.model.snapshots.SummarySnapshot;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Prometheus Collector to store and export metrics retrieved by {@link YammerPrometheusMetricsReporter}.
Expand All @@ -47,17 +40,13 @@ public class YammerMetricsCollector implements MultiCollector {
private static final Logger LOG = LoggerFactory.getLogger(YammerMetricsCollector.class);
private static final List<Double> QUANTILES = Arrays.asList(0.50, 0.75, 0.95, 0.98, 0.99, 0.999);

private final List<MetricsRegistry> registries;
private final PrometheusMetricsReporterConfig config;
private final Map<MetricName, MetricWrapper> metrics;

/**
* Constructs a new YammerMetricsCollector with the provided configuration.
*
* @param config The configuration for the YammerMetricsCollector.
* Constructor
*/
public YammerMetricsCollector(PrometheusMetricsReporterConfig config) {
this.config = config;
this.registries = Arrays.asList(KafkaYammerMetrics.defaultRegistry(), Metrics.defaultRegistry());
public YammerMetricsCollector() {
this.metrics = new ConcurrentHashMap<>();
}

/**
Expand All @@ -72,48 +61,41 @@ public MetricSnapshots collect() {
Map<String, InfoSnapshot.Builder> infoBuilders = new HashMap<>();
Map<String, SummarySnapshot.Builder> summaryBuilders = new HashMap<>();

for (MetricsRegistry registry : registries) {
for (Map.Entry<MetricName, Metric> entry : registry.allMetrics().entrySet()) {
MetricName metricName = entry.getKey();
Metric metric = entry.getValue();

String prometheusMetricName = metricName(metricName);
if (!config.isAllowed(prometheusMetricName)) {
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusMetricName);
continue;
}
Labels labels = labelsFromScope(metricName.getScope(), prometheusMetricName);
LOG.debug("Collecting metric {} with the following labels: {}", prometheusMetricName, labels);
for (Map.Entry<MetricName, MetricWrapper> entry : metrics.entrySet()) {
MetricWrapper metricWrapper = entry.getValue();
String prometheusMetricName = metricWrapper.prometheusName();
Object metric = metricWrapper.value();
Labels labels = metricWrapper.labels();
LOG.debug("Collecting metric {} with the following labels: {}", prometheusMetricName, labels);

if (metric instanceof Counter) {
Counter counter = (Counter) metric;
CounterSnapshot.Builder builder = counterBuilders.computeIfAbsent(prometheusMetricName, k -> CounterSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.counterDataPoint(labels, counter.count()));
} else if (metric instanceof Gauge) {
Object valueObj = ((Gauge<?>) metric).value();
if (valueObj instanceof Number) {
double value = ((Number) valueObj).doubleValue();
GaugeSnapshot.Builder builder = gaugeBuilders.computeIfAbsent(prometheusMetricName, k -> GaugeSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.gaugeDataPoint(labels, value));
} else {
InfoSnapshot.Builder builder = infoBuilders.computeIfAbsent(prometheusMetricName, k -> InfoSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, valueObj, metricName.getName()));
}
} else if (metric instanceof Timer) {
Timer timer = (Timer) metric;
SummarySnapshot.Builder builder = summaryBuilders.computeIfAbsent(prometheusMetricName, k -> SummarySnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.summaryDataPoint(labels, timer.count(), timer.sum(), quantiles(timer)));
} else if (metric instanceof Histogram) {
Histogram histogram = (Histogram) metric;
SummarySnapshot.Builder builder = summaryBuilders.computeIfAbsent(prometheusMetricName, k -> SummarySnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.summaryDataPoint(labels, histogram.count(), histogram.sum(), quantiles(histogram)));
} else if (metric instanceof Meter) {
Meter meter = (Meter) metric;
CounterSnapshot.Builder builder = counterBuilders.computeIfAbsent(prometheusMetricName, k -> CounterSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.counterDataPoint(labels, meter.count()));
if (metric instanceof Counter) {
Counter counter = (Counter) metric;
CounterSnapshot.Builder builder = counterBuilders.computeIfAbsent(prometheusMetricName, k -> CounterSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.counterDataPoint(labels, counter.count()));
} else if (metric instanceof Gauge) {
Object valueObj = ((Gauge<?>) metric).value();
if (valueObj instanceof Number) {
double value = ((Number) valueObj).doubleValue();
GaugeSnapshot.Builder builder = gaugeBuilders.computeIfAbsent(prometheusMetricName, k -> GaugeSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.gaugeDataPoint(labels, value));
} else {
LOG.error("The metric {} has an unexpected type: {}", prometheusMetricName, metric.getClass().getName());
InfoSnapshot.Builder builder = infoBuilders.computeIfAbsent(prometheusMetricName, k -> InfoSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, valueObj, metricWrapper.attribute()));
}
} else if (metric instanceof Timer) {
Timer timer = (Timer) metric;
SummarySnapshot.Builder builder = summaryBuilders.computeIfAbsent(prometheusMetricName, k -> SummarySnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.summaryDataPoint(labels, timer.count(), timer.sum(), quantiles(timer)));
} else if (metric instanceof Histogram) {
Histogram histogram = (Histogram) metric;
SummarySnapshot.Builder builder = summaryBuilders.computeIfAbsent(prometheusMetricName, k -> SummarySnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.summaryDataPoint(labels, histogram.count(), histogram.sum(), quantiles(histogram)));
} else if (metric instanceof Meter) {
Meter meter = (Meter) metric;
CounterSnapshot.Builder builder = counterBuilders.computeIfAbsent(prometheusMetricName, k -> CounterSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.counterDataPoint(labels, meter.count()));
} else {
LOG.error("The metric {} has an unexpected type: {}", prometheusMetricName, metric.getClass().getName());
}
}
List<MetricSnapshot> snapshots = new ArrayList<>();
Expand All @@ -132,31 +114,23 @@ public MetricSnapshots collect() {
return new MetricSnapshots(snapshots);
}

static String metricName(MetricName metricName) {
return PrometheusNaming.prometheusName(PrometheusNaming.sanitizeMetricName(
"kafka_server_" +
metricName.getGroup() + '_' +
metricName.getType() + '_' +
metricName.getName()).toLowerCase(Locale.ROOT));
/**
* Add a Yammer metric to be collected.
*
* @param name The name of the Yammer metric to add.
* @param metric The Yammer metric to add.
*/
public void addMetric(MetricName name, MetricWrapper metric) {
metrics.put(name, metric);
}

static Labels labelsFromScope(String scope, String metricName) {
Labels.Builder builder = Labels.builder();
Set<String> labelNames = new HashSet<>();
if (scope != null) {
String[] parts = scope.split("\\.");
if (parts.length % 2 == 0) {
for (int i = 0; i < parts.length; i += 2) {
String newLabelName = PrometheusNaming.sanitizeLabelName(parts[i]);
if (labelNames.add(newLabelName)) {
builder.label(newLabelName, parts[i + 1]);
} else {
LOG.warn("Ignoring duplicate label key: {} with value: {} from metric: {} ", newLabelName, parts[i + 1], metricName);
}
}
}
}
return builder.build();
/**
* Remove a Yammer metric from collection.
*
* @param name The name of the Yammer metric to remove.
*/
public void removeMetric(MetricName name) {
metrics.remove(name);
}

private static Quantiles quantiles(Sampling sampling) {
Expand Down
Loading

0 comments on commit 586860c

Please sign in to comment.