Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add prometheus support to frontend module #1695

Merged
merged 7 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static HermesServer provideHermesServer() throws IOException {
ThroughputLimiter throughputLimiter = (exampleTopic, throughput) -> quotaConfirmed();
HermesMetrics hermesMetrics = new HermesMetrics(new MetricRegistry(), new PathsCompiler(""));
MetricsFacade metricsFacade = new MetricsFacade(new SimpleMeterRegistry(), hermesMetrics);
TopicsCache topicsCache = new InMemoryTopicsCache(hermesMetrics, metricsFacade, topic);
TopicsCache topicsCache = new InMemoryTopicsCache(metricsFacade, topic);
BrokerMessageProducer brokerMessageProducer = new InMemoryBrokerMessageProducer();
RawSchemaClient rawSchemaClient = new InMemorySchemaClient(topic.getName(), loadMessageResource("schema"), 1, 1);
Trackers trackers = new Trackers(Collections.emptyList());
Expand All @@ -67,7 +67,7 @@ static HermesServer provideHermesServer() throws IOException {
return new HermesServer(
sslProperties,
hermesServerProperties,
hermesMetrics,
metricsFacade,
httpHandler,
new DisabledReadinessChecker(false),
new NoOpMessagePreviewPersister(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@

class InMemoryTopicsCache implements TopicsCache {

private final HermesMetrics oldMetrics;
private final MetricsFacade metricsFacade;
private final KafkaTopics kafkaTopics;
private final Topic topic;


InMemoryTopicsCache(HermesMetrics oldMetrics, MetricsFacade metricsFacade, Topic topic) {
this.oldMetrics = oldMetrics;
InMemoryTopicsCache(MetricsFacade metricsFacade, Topic topic) {
this.metricsFacade = metricsFacade;
this.topic = topic;
this.kafkaTopics = new KafkaTopics(new KafkaTopic(KafkaTopicName.valueOf(topic.getQualifiedName()), topic.getContentType()));
Expand All @@ -33,7 +31,6 @@ public Optional<CachedTopic> getTopic(String qualifiedTopicName) {
return Optional.of(
new CachedTopic(
topic,
oldMetrics,
metricsFacade,
kafkaTopics
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;

import java.util.function.ToDoubleFunction;

public class GaugeRegistrar {
private final MeterRegistry meterRegistry;
private final HermesMetrics hermesMetrics;

public GaugeRegistrar(MeterRegistry meterRegistry, HermesMetrics hermesMetrics) {
this.meterRegistry = meterRegistry;
this.hermesMetrics = hermesMetrics;
}

public <T> void registerGauge(String graphiteName,
String prometheusName,
T stateObj,
ToDoubleFunction<T> f,
Tags tags) {
meterRegistry.gauge(prometheusName, tags, stateObj, f);
hermesMetrics.registerGauge(graphiteName, () -> f.applyAsDouble(stateObj));
}

public <T> void registerGauge(String graphiteName,
String prometheusName,
T stateObj,
ToDoubleFunction<T> f) {
registerGauge(graphiteName, prometheusName, stateObj, f, Tags.empty());
}

public <T> void registerGauge(String name,
T stateObj,
ToDoubleFunction<T> f) {
registerGauge(name, name, stateObj, f);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,6 @@
import static pl.allegro.tech.hermes.metrics.PathsCompiler.TOPIC;

public class Gauges {

public static final String KAFKA_PRODUCER = "kafka-producer.";
public static final String ACK_LEADER = "ack-leader.";
public static final String ACK_ALL = "ack-all.";

public static final String ACK_ALL_BUFFER_TOTAL_BYTES = KAFKA_PRODUCER + ACK_ALL + "buffer-total-bytes";
public static final String ACK_ALL_BUFFER_AVAILABLE_BYTES = KAFKA_PRODUCER + ACK_ALL + "buffer-available-bytes";
public static final String ACK_ALL_CONFIRMS_METADATA_AGE = KAFKA_PRODUCER + ACK_ALL + "metadata-age";
public static final String ACK_ALL_RECORD_QUEUE_TIME_MAX = KAFKA_PRODUCER + ACK_ALL + "record-queue-time-max";
public static final String ACK_ALL_COMPRESSION_RATE = KAFKA_PRODUCER + ACK_ALL + "compression-rate-avg";
public static final String ACK_ALL_FAILED_BATCHES_TOTAL = KAFKA_PRODUCER + ACK_ALL + "failed-batches-total";

public static final String ACK_LEADER_FAILED_BATCHES_TOTAL = KAFKA_PRODUCER + ACK_LEADER + "failed-batches-total";
public static final String ACK_LEADER_BUFFER_TOTAL_BYTES = KAFKA_PRODUCER + ACK_LEADER + "buffer-total-bytes";
public static final String ACK_LEADER_METADATA_AGE = KAFKA_PRODUCER + ACK_LEADER + "metadata-age";
public static final String ACK_LEADER_RECORD_QUEUE_TIME_MAX = KAFKA_PRODUCER + ACK_LEADER + "record-queue-time-max";
public static final String ACK_LEADER_BUFFER_AVAILABLE_BYTES = KAFKA_PRODUCER + ACK_LEADER + "buffer-available-bytes";
public static final String ACK_LEADER_COMPRESSION_RATE = KAFKA_PRODUCER + ACK_LEADER + "compression-rate-avg";

public static final String BATCH_BUFFER_TOTAL_BYTES = "batch-buffer-total-bytes";
public static final String BATCH_BUFFER_AVAILABLE_BYTES = "batch-buffer-available-bytes";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
import pl.allegro.tech.hermes.metrics.PathContext;
import pl.allegro.tech.hermes.metrics.PathsCompiler;

import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_ALL_BUFFER_AVAILABLE_BYTES;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_ALL_BUFFER_TOTAL_BYTES;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_LEADER_BUFFER_AVAILABLE_BYTES;
import static pl.allegro.tech.hermes.common.metric.Gauges.ACK_LEADER_BUFFER_TOTAL_BYTES;
import static pl.allegro.tech.hermes.common.metric.Histograms.INFLIGHT_TIME;
import static pl.allegro.tech.hermes.common.metric.Meters.ERRORS_HTTP_BY_CODE;
import static pl.allegro.tech.hermes.common.metric.Meters.ERRORS_HTTP_BY_FAMILY;
Expand Down Expand Up @@ -172,20 +168,6 @@ public static void close(Timer.Context... timers) {
}
}

public double getBufferTotalBytes() {
return getDoubleValue(ACK_LEADER_BUFFER_TOTAL_BYTES)
+ getDoubleValue(ACK_ALL_BUFFER_TOTAL_BYTES);
}

public double getBufferAvailablesBytes() {
return getDoubleValue(ACK_LEADER_BUFFER_AVAILABLE_BYTES)
+ getDoubleValue(ACK_ALL_BUFFER_AVAILABLE_BYTES);
}

private double getDoubleValue(String gauge) {
return (double) metricRegistry.getGauges().get(pathCompiler.compile(gauge)).getValue();
}

private Counter getInflightCounter(SubscriptionName subscription) {
return counter(Counters.INFLIGHT, subscription.getTopicName(), subscription.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ public class MetricsFacade {
private final SubscriptionMetrics subscriptionMetrics;
private final ConsumerMetrics consumerMetrics;
private final TrackerElasticSearchMetrics trackerElasticSearchMetrics;
private final PersistentBufferMetrics persistentBufferMetrics;
private final ProducerMetrics producerMetrics;

public MetricsFacade(MeterRegistry meterRegistry,
HermesMetrics hermesMetrics) {
this.topicMetrics = new TopicMetrics(hermesMetrics, meterRegistry);
this.subscriptionMetrics = new SubscriptionMetrics(hermesMetrics, meterRegistry);
this.consumerMetrics = new ConsumerMetrics(hermesMetrics, meterRegistry);
this.trackerElasticSearchMetrics = new TrackerElasticSearchMetrics(hermesMetrics, meterRegistry);
this.persistentBufferMetrics = new PersistentBufferMetrics(hermesMetrics, meterRegistry);
this.producerMetrics = new ProducerMetrics(hermesMetrics, meterRegistry);
}

public TopicMetrics topics() {
Expand All @@ -32,5 +36,13 @@ public ConsumerMetrics consumers() {
public TrackerElasticSearchMetrics trackerElasticSearch() {
return trackerElasticSearchMetrics;
}

public PersistentBufferMetrics persistentBuffer() {
return persistentBufferMetrics;
}

public ProducerMetrics producer() {
return producerMetrics;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;

import java.util.function.ToDoubleFunction;

import static pl.allegro.tech.hermes.common.metric.Gauges.BACKUP_STORAGE_SIZE;

public class PersistentBufferMetrics {
private final MeterRegistry meterRegistry;
private final HermesMetrics hermesMetrics;

public PersistentBufferMetrics(HermesMetrics hermesMetrics, MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.hermesMetrics = hermesMetrics;
}

public <T> void registerBackupStorageSizeGauge(T obj, ToDoubleFunction<T> f) {
hermesMetrics.registerMessageRepositorySizeGauge(() -> (int) f.applyAsDouble(obj));
meterRegistry.gauge(BACKUP_STORAGE_SIZE, obj, f);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;

import java.util.concurrent.TimeUnit;
import java.util.function.ToDoubleFunction;

import static pl.allegro.tech.hermes.common.metric.Gauges.INFLIGHT_REQUESTS;
import static pl.allegro.tech.hermes.common.metric.HermesMetrics.escapeDots;

public class ProducerMetrics {
private final HermesMetrics hermesMetrics;
private final MeterRegistry meterRegistry;
private final GaugeRegistrar gaugeRegistrar;

public ProducerMetrics(HermesMetrics hermesMetrics, MeterRegistry meterRegistry) {
this.hermesMetrics = hermesMetrics;
this.meterRegistry = meterRegistry;
this.gaugeRegistrar = new GaugeRegistrar(meterRegistry, hermesMetrics);
}

public <T> void registerAckAllTotalBytesGauge(T stateObj, ToDoubleFunction<T> f) {
gaugeRegistrar.registerGauge(ACK_ALL_BUFFER_TOTAL_BYTES, stateObj, f);
}

public <T> void registerAckLeaderTotalBytesGauge(T stateObj, ToDoubleFunction<T> f) {
gaugeRegistrar.registerGauge(ACK_LEADER_BUFFER_TOTAL_BYTES, stateObj, f);
}

public <T> void registerAckAllAvailableBytesGauge(T stateObj, ToDoubleFunction<T> f) {
gaugeRegistrar.registerGauge(ACK_ALL_BUFFER_AVAILABLE_BYTES, stateObj, f);
}

public <T> void registerAckLeaderAvailableBytesGauge(T stateObj, ToDoubleFunction<T> f) {
gaugeRegistrar.registerGauge(ACK_LEADER_BUFFER_AVAILABLE_BYTES, stateObj, f);
}

public <T> void registerAckAllCompressionRateGauge(T stateObj, ToDoubleFunction<T> f) {
gaugeRegistrar.registerGauge(ACK_ALL_COMPRESSION_RATE, stateObj, f);
}

public <T> void registerAckLeaderCompressionRateGauge(T stateObj, ToDoubleFunction<T> f) {
gaugeRegistrar.registerGauge(ACK_LEADER_COMPRESSION_RATE, stateObj, f);
}

public <T> void registerAckAllFailedBatchesGauge(T stateObj, ToDoubleFunction<T> f) {
gaugeRegistrar.registerGauge(ACK_ALL_FAILED_BATCHES_TOTAL, stateObj, f);
}

public <T> void registerAckLeaderFailedBatchesGauge(T stateObj, ToDoubleFunction<T> f) {
gaugeRegistrar.registerGauge(ACK_LEADER_FAILED_BATCHES_TOTAL, stateObj, f);
}

public <T> void registerAckAllMetadataAgeGauge(T stateObj, ToDoubleFunction<T> f) {
registerTimeGauge(stateObj, f, ACK_ALL_METADATA_AGE, ACK_ALL_METADATA_AGE, Tags.empty(), TimeUnit.SECONDS);
}

public <T> void registerAckLeaderMetadataAgeGauge(T stateObj, ToDoubleFunction<T> f) {
registerTimeGauge(stateObj, f, ACK_LEADER_METADATA_AGE, ACK_LEADER_METADATA_AGE, Tags.empty(), TimeUnit.SECONDS);
}

public <T> void registerAckAllRecordQueueTimeMaxGauge(T stateObj, ToDoubleFunction<T> f) {
registerTimeGauge(stateObj, f, ACK_ALL_RECORD_QUEUE_TIME_MAX, ACK_ALL_RECORD_QUEUE_TIME_MAX, Tags.empty(), TimeUnit.MILLISECONDS);
}

public <T> void registerAckLeaderRecordQueueTimeMaxGauge(T stateObj, ToDoubleFunction<T> f) {
registerTimeGauge(stateObj, f, ACK_LEADER_RECORD_QUEUE_TIME_MAX,
ACK_LEADER_RECORD_QUEUE_TIME_MAX, Tags.empty(), TimeUnit.MILLISECONDS);
}

public double getBufferTotalBytes() {
return meterRegistry.get(ACK_ALL_BUFFER_TOTAL_BYTES).gauge().value()
+ meterRegistry.get(ACK_LEADER_BUFFER_TOTAL_BYTES).gauge().value();
}

public double getBufferAvailableBytes() {
return meterRegistry.get(ACK_ALL_BUFFER_AVAILABLE_BYTES).gauge().value()
+ meterRegistry.get(ACK_LEADER_BUFFER_AVAILABLE_BYTES).gauge().value();
}

public <T> void registerProducerInflightRequestGauge(T stateObj, ToDoubleFunction<T> f) {
meterRegistry.gauge(INFLIGHT_REQUESTS, stateObj, f);
hermesMetrics.registerProducerInflightRequest(() -> (int) f.applyAsDouble(stateObj));
}

public <T> void registerAckAllMaxLatencyBrokerGauge(T stateObj, ToDoubleFunction<T> f, String brokerNodeId) {
registerLatencyPerBrokerGauge(stateObj, f, "request-latency-max", ACK_ALL, brokerNodeId);
}

public <T> void registerAckLeaderMaxLatencyPerBrokerGauge(T stateObj, ToDoubleFunction<T> f, String brokerNodeId) {
registerLatencyPerBrokerGauge(stateObj, f, "request-latency-max", ACK_LEADER, brokerNodeId);
}

public <T> void registerAckAllAvgLatencyPerBrokerGauge(T stateObj, ToDoubleFunction<T> f, String brokerNodeId) {
registerLatencyPerBrokerGauge(stateObj, f, "request-latency-avg", ACK_ALL, brokerNodeId);
}

public <T> void registerAckLeaderAvgLatencyPerBrokerGauge(T stateObj, ToDoubleFunction<T> f, String brokerNodeId) {
registerLatencyPerBrokerGauge(stateObj, f, "request-latency-avg", ACK_LEADER, brokerNodeId);
}

private <T> void registerLatencyPerBrokerGauge(T stateObj,
ToDoubleFunction<T> f,
String metricName,
String producerName,
String brokerNodeId) {
String baseMetricName = KAFKA_PRODUCER + producerName + metricName;
String graphiteMetricName = baseMetricName + "." + escapeDots(brokerNodeId);

registerTimeGauge(stateObj, f, graphiteMetricName, baseMetricName, Tags.of("broker", brokerNodeId), TimeUnit.MILLISECONDS);
}

private <T> void registerTimeGauge(T stateObj,
ToDoubleFunction<T> f,
String graphiteName,
String prometheusName,
Tags tags,
TimeUnit timeUnit) {
hermesMetrics.registerGauge(graphiteName, () -> f.applyAsDouble(stateObj));
meterRegistry.more().timeGauge(prometheusName, tags, stateObj, timeUnit, f);
}

private static final String KAFKA_PRODUCER = "kafka-producer.";
private static final String ACK_LEADER = "ack-leader.";
private static final String ACK_ALL = "ack-all.";

private static final String ACK_ALL_BUFFER_TOTAL_BYTES = KAFKA_PRODUCER + ACK_ALL + "buffer-total-bytes";
private static final String ACK_ALL_BUFFER_AVAILABLE_BYTES = KAFKA_PRODUCER + ACK_ALL + "buffer-available-bytes";
private static final String ACK_ALL_METADATA_AGE = KAFKA_PRODUCER + ACK_ALL + "metadata-age";
private static final String ACK_ALL_RECORD_QUEUE_TIME_MAX = KAFKA_PRODUCER + ACK_ALL + "record-queue-time-max";
piotrrzysko marked this conversation as resolved.
Show resolved Hide resolved
private static final String ACK_ALL_COMPRESSION_RATE = KAFKA_PRODUCER + ACK_ALL + "compression-rate-avg";
private static final String ACK_ALL_FAILED_BATCHES_TOTAL = KAFKA_PRODUCER + ACK_ALL + "failed-batches-total";

private static final String ACK_LEADER_FAILED_BATCHES_TOTAL = KAFKA_PRODUCER + ACK_LEADER + "failed-batches-total";
private static final String ACK_LEADER_BUFFER_TOTAL_BYTES = KAFKA_PRODUCER + ACK_LEADER + "buffer-total-bytes";
private static final String ACK_LEADER_METADATA_AGE = KAFKA_PRODUCER + ACK_LEADER + "metadata-age";
private static final String ACK_LEADER_RECORD_QUEUE_TIME_MAX = KAFKA_PRODUCER + ACK_LEADER + "record-queue-time-max";
private static final String ACK_LEADER_BUFFER_AVAILABLE_BYTES = KAFKA_PRODUCER + ACK_LEADER + "buffer-available-bytes";
private static final String ACK_LEADER_COMPRESSION_RATE = KAFKA_PRODUCER + ACK_LEADER + "compression-rate-avg";
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import com.codahale.metrics.Meter;
import io.micrometer.core.instrument.Counter;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.metrics.HermesCounter;
import pl.allegro.tech.hermes.metrics.counters.MeterBackedHermesCounter;

public class SubscriptionHermesCounter extends HermesCounter {
public class SubscriptionHermesCounter extends MeterBackedHermesCounter {

private final String graphiteName;
private final SubscriptionName subscription;
Expand Down
Loading
Loading