Skip to content

Commit

Permalink
Zookeeper counter based on prometheus metrics (#1706)
Browse files Browse the repository at this point in the history
* Zookeeper counter based on prometheus metrics

* Fix method name
  • Loading branch information
faderskd authored Aug 18, 2023
1 parent 8c20cd8 commit 80ee439
Show file tree
Hide file tree
Showing 18 changed files with 405 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static TopicMetrics unavailable() {
}

public static class Builder {
private TopicMetrics topicMetrics;
private final TopicMetrics topicMetrics;

public Builder() {
topicMetrics = new TopicMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.common.metric.MetricRegistryWithHdrHistogramReservoir;
import pl.allegro.tech.hermes.common.metric.counter.CounterStorage;
import pl.allegro.tech.hermes.common.metric.counter.zookeeper.ZookeeperCounterReporter;
import pl.allegro.tech.hermes.common.util.InstanceIdResolver;

import java.net.InetSocketAddress;
Expand All @@ -33,18 +31,15 @@ public class MetricRegistryFactory {
private static final Logger logger = LoggerFactory.getLogger(MetricRegistryFactory.class);
private final MetricRegistryParameters metricRegistryParameters;
private final GraphiteParameters graphiteParameters;
private final CounterStorage counterStorage;
private final InstanceIdResolver instanceIdResolver;
private final String moduleName;

public MetricRegistryFactory(MetricRegistryParameters metricRegistryParameters,
GraphiteParameters graphiteParameters,
CounterStorage counterStorage,
InstanceIdResolver instanceIdResolver,
@Named("moduleName") String moduleName) {
this.metricRegistryParameters = metricRegistryParameters;
this.graphiteParameters = graphiteParameters;
this.counterStorage = counterStorage;
this.instanceIdResolver = instanceIdResolver;
this.moduleName = moduleName;
}
Expand Down Expand Up @@ -73,14 +68,6 @@ public MetricRegistry provide() {
metricRegistryParameters.getReportPeriod().toSeconds(), TimeUnit.SECONDS
);
}

if (metricRegistryParameters.isZookeeperReporterEnabled()) {
new ZookeeperCounterReporter(registry, counterStorage, graphiteParameters.getPrefix()).start(
metricRegistryParameters.getReportPeriod().toSeconds(),
TimeUnit.SECONDS
);
}

registerJvmMetrics(registry);

return registry;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package pl.allegro.tech.hermes.common.di.factories;

import java.time.Duration;
import java.util.List;

public interface MicrometerRegistryParameters {
List<Double> getPercentiles();

boolean zookeeperReporterEnabled();

Duration zookeeperReportPeriod();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,54 @@
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import pl.allegro.tech.hermes.common.metric.counter.CounterStorage;
import pl.allegro.tech.hermes.common.metric.counter.zookeeper.ZookeeperCounterReporter;

import java.util.concurrent.TimeUnit;

public class PrometheusMeterRegistryFactory {
private final MicrometerRegistryParameters parameters;
private final PrometheusConfig prometheusConfig;
private final CounterStorage counterStorage;
private final String prefix;

public PrometheusMeterRegistryFactory(MicrometerRegistryParameters parameters,
PrometheusConfig prometheusConfig,
String prefix) {
CounterStorage counterStorage, String prefix) {
this.parameters = parameters;
this.prometheusConfig = prometheusConfig;
this.counterStorage = counterStorage;
this.prefix = prefix + "_";
}

public PrometheusMeterRegistry provide() {
PrometheusMeterRegistry meterRegistry = new PrometheusMeterRegistry(prometheusConfig);
applyFilters(meterRegistry);
if (parameters.zookeeperReporterEnabled()) {
registerZookeeperReporter(meterRegistry);
}
return meterRegistry;
}

private void applyFilters(PrometheusMeterRegistry meterRegistry) {
meterRegistry.config().meterFilter(new MeterFilter() {
@Override
public Meter.Id map(Meter.Id id) {
return id.withName(prefix + id.getName());
}

@Override
public DistributionStatisticConfig configure(Meter.Id id,
DistributionStatisticConfig config) {
public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) {
return DistributionStatisticConfig.builder()
.percentiles(parameters.getPercentiles().stream().mapToDouble(Double::doubleValue).toArray())
.build()
.merge(config);
.percentiles(parameters.getPercentiles()
.stream().mapToDouble(Double::doubleValue).toArray()
).build().merge(config);
}
});
return meterRegistry;
}

private void registerZookeeperReporter(PrometheusMeterRegistry meterRegistry) {
new ZookeeperCounterReporter(meterRegistry, counterStorage, prefix)
.start(parameters.zookeeperReportPeriod().toSeconds(), TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,14 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.metrics.DefaultHermesHistogram;
import pl.allegro.tech.hermes.metrics.HermesCounter;
import pl.allegro.tech.hermes.metrics.HermesHistogram;
import pl.allegro.tech.hermes.metrics.HermesTimer;
import pl.allegro.tech.hermes.metrics.counters.HermesCounters;

import java.util.concurrent.TimeUnit;
import java.util.function.ToDoubleFunction;

import static pl.allegro.tech.hermes.common.metric.Counters.DELIVERED;
import static pl.allegro.tech.hermes.common.metric.Counters.DISCARDED;
import static pl.allegro.tech.hermes.common.metric.Meters.DISCARDED_METER;
import static pl.allegro.tech.hermes.common.metric.Meters.DISCARDED_SUBSCRIPTION_METER;
import static pl.allegro.tech.hermes.common.metric.Meters.DISCARDED_TOPIC_METER;
import static pl.allegro.tech.hermes.common.metric.Meters.FAILED_METER_SUBSCRIPTION;
import static pl.allegro.tech.hermes.common.metric.Meters.FILTERED_METER;
import static pl.allegro.tech.hermes.common.metric.Meters.METER;
import static pl.allegro.tech.hermes.common.metric.Meters.SUBSCRIPTION_BATCH_METER;
import static pl.allegro.tech.hermes.common.metric.Meters.SUBSCRIPTION_METER;
import static pl.allegro.tech.hermes.common.metric.Meters.SUBSCRIPTION_THROUGHPUT_BYTES;
import static pl.allegro.tech.hermes.common.metric.Meters.TOPIC_METER;
import static pl.allegro.tech.hermes.common.metric.SubscriptionTagsFactory.subscriptionTags;
import static pl.allegro.tech.hermes.common.metric.Timers.CONSUMER_IDLE_TIME;
import static pl.allegro.tech.hermes.common.metric.Timers.SUBSCRIPTION_LATENCY;

public class SubscriptionMetrics {
private final HermesMetrics hermesMetrics;
Expand All @@ -41,68 +25,68 @@ public SubscriptionMetrics(HermesMetrics hermesMetrics, MeterRegistry meterRegis

public SubscriptionHermesCounter throughputInBytes(SubscriptionName subscription) {
return SubscriptionHermesCounter.from(
micrometerCounter("subscription-throughput-bytes", subscription),
hermesMetrics.meter(SUBSCRIPTION_THROUGHPUT_BYTES, subscription.getTopicName(), subscription.getName()),
SUBSCRIPTION_THROUGHPUT_BYTES, subscription);
micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_THROUGHPUT, subscription),
hermesMetrics.meter(Meters.SUBSCRIPTION_THROUGHPUT_BYTES, subscription.getTopicName(), subscription.getName()),
Meters.SUBSCRIPTION_THROUGHPUT_BYTES, subscription);
}

public HermesCounter successes(SubscriptionName subscription) {
return size -> {
hermesMetrics.meter(METER).mark(size);
hermesMetrics.meter(TOPIC_METER, subscription.getTopicName()).mark(size);
hermesMetrics.meter(SUBSCRIPTION_METER, subscription.getTopicName(), subscription.getName()).mark(size);
hermesMetrics.counter(DELIVERED, subscription.getTopicName(), subscription.getName()).inc(size);
micrometerCounter("subscription.delivered", subscription).increment(size);
hermesMetrics.meter(Meters.METER).mark(size);
hermesMetrics.meter(Meters.TOPIC_METER, subscription.getTopicName()).mark(size);
hermesMetrics.meter(Meters.SUBSCRIPTION_METER, subscription.getTopicName(), subscription.getName()).mark(size);
hermesMetrics.counter(Counters.DELIVERED, subscription.getTopicName(), subscription.getName()).inc(size);
micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_DELIVERED, subscription).increment(size);
};
}

public HermesCounter batchSuccesses(SubscriptionName subscription) {
return HermesCounters.from(
micrometerCounter("subscription.batches", subscription),
hermesMetrics.meter(SUBSCRIPTION_BATCH_METER, subscription.getTopicName(), subscription.getName())
micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_BATCHES, subscription),
hermesMetrics.meter(Meters.SUBSCRIPTION_BATCH_METER, subscription.getTopicName(), subscription.getName())
);
}

public HermesCounter discarded(SubscriptionName subscription) {
return size -> {
hermesMetrics.meter(DISCARDED_METER).mark(size);
hermesMetrics.meter(DISCARDED_TOPIC_METER, subscription.getTopicName()).mark(size);
hermesMetrics.meter(DISCARDED_SUBSCRIPTION_METER, subscription.getTopicName(), subscription.getName()).mark(size);
hermesMetrics.counter(DISCARDED, subscription.getTopicName(), subscription.getName()).inc(size);
micrometerCounter("subscription.discarded", subscription).increment(size);
hermesMetrics.meter(Meters.DISCARDED_METER).mark(size);
hermesMetrics.meter(Meters.DISCARDED_TOPIC_METER, subscription.getTopicName()).mark(size);
hermesMetrics.meter(Meters.DISCARDED_SUBSCRIPTION_METER, subscription.getTopicName(), subscription.getName()).mark(size);
hermesMetrics.counter(Counters.DISCARDED, subscription.getTopicName(), subscription.getName()).inc(size);
micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_DISCARDED, subscription).increment(size);
};
}

public HermesTimer latency(SubscriptionName subscription) {
return HermesTimer.from(
meterRegistry.timer("subscription.latency", subscriptionTags(subscription)),
hermesMetrics.timer(SUBSCRIPTION_LATENCY, subscription.getTopicName(), subscription.getName())
meterRegistry.timer(SubscriptionMetricsNames.SUBSCRIPTION_LATENCY, subscriptionTags(subscription)),
hermesMetrics.timer(Timers.SUBSCRIPTION_LATENCY, subscription.getTopicName(), subscription.getName())
);
}

public <T> void registerInflightGauge(SubscriptionName subscription, T obj, ToDoubleFunction<T> f) {
hermesMetrics.registerInflightGauge(subscription, () -> (int) f.applyAsDouble(obj));
meterRegistry.gauge("subscription.inflight", subscriptionTags(subscription), obj, f);
meterRegistry.gauge(SubscriptionMetricsNames.SUBSCRIPTION_INFLIGHT, subscriptionTags(subscription), obj, f);
}

public HermesTimer consumerIdleTimer(SubscriptionName subscription) {
return HermesTimer.from(
meterRegistry.timer("subscription.idle-duration", subscriptionTags(subscription)),
hermesMetrics.timer(CONSUMER_IDLE_TIME, subscription.getTopicName(), subscription.getName())
meterRegistry.timer(SubscriptionMetricsNames.SUBSCRIPTION_IDLE_DURATION, subscriptionTags(subscription)),
hermesMetrics.timer(Timers.CONSUMER_IDLE_TIME, subscription.getTopicName(), subscription.getName())
);
}

public HermesCounter filteredOutCounter(SubscriptionName subscription) {
return HermesCounters.from(
micrometerCounter("subscription.filtered-out", subscription),
hermesMetrics.meter(FILTERED_METER, subscription.getTopicName(), subscription.getName())
micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_FILTERED_OUT, subscription),
hermesMetrics.meter(Meters.FILTERED_METER, subscription.getTopicName(), subscription.getName())
);
}

public HermesCounter httpAnswerCounter(SubscriptionName subscription, int statusCode) {
return size -> {
meterRegistry.counter(
"subscription.http-status-codes",
SubscriptionMetricsNames.SUBSCRIPTION_HTTP_STATUS_CODES,
Tags.concat(subscriptionTags(subscription), "status_code", String.valueOf(statusCode))
).increment(size);
hermesMetrics.registerConsumerHttpAnswer(subscription, statusCode, size);
Expand All @@ -111,28 +95,28 @@ public HermesCounter httpAnswerCounter(SubscriptionName subscription, int status

public HermesCounter timeoutsCounter(SubscriptionName subscription) {
return HermesCounters.from(
micrometerCounter("subscription.timeouts", subscription),
micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_TIMEOUTS, subscription),
hermesMetrics.consumerErrorsTimeoutMeter(subscription)
);
}

public HermesCounter otherErrorsCounter(SubscriptionName subscription) {
return HermesCounters.from(
micrometerCounter("subscription.other-errors", subscription),
micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_OTHER_ERRORS, subscription),
hermesMetrics.consumerErrorsOtherMeter(subscription)
);
}

public HermesCounter failuresCounter(SubscriptionName subscription) {
return HermesCounters.from(
micrometerCounter("subscription.failures", subscription),
hermesMetrics.meter(FAILED_METER_SUBSCRIPTION, subscription.getTopicName(), subscription.getName())
micrometerCounter(SubscriptionMetricsNames.SUBSCRIPTION_FAILURES, subscription),
hermesMetrics.meter(Meters.FAILED_METER_SUBSCRIPTION, subscription.getTopicName(), subscription.getName())
);
}

public HermesHistogram inflightTimeInMillisHistogram(SubscriptionName subscriptionName) {
return value -> {
DistributionSummary.builder("subscription.inflight-time-seconds")
DistributionSummary.builder(SubscriptionMetricsNames.SUBSCRIPTION_INFLIGHT_TIME)
.tags(subscriptionTags(subscriptionName))
.register(meterRegistry)
.record(value / 1000d);
Expand All @@ -143,4 +127,21 @@ public HermesHistogram inflightTimeInMillisHistogram(SubscriptionName subscripti
private Counter micrometerCounter(String metricName, SubscriptionName subscription) {
return meterRegistry.counter(metricName, subscriptionTags(subscription));
}

public static class SubscriptionMetricsNames {
public static final String SUBSCRIPTION_DELIVERED = "subscription.delivered";
public static final String SUBSCRIPTION_THROUGHPUT = "subscription.throughput-bytes";
public static final String SUBSCRIPTION_BATCHES = "subscription.batches";
public static final String SUBSCRIPTION_DISCARDED = "subscription.discarded";
public static final String SUBSCRIPTION_LATENCY = "subscription.latency";
public static final String SUBSCRIPTION_INFLIGHT = "subscription.inflight";
public static final String SUBSCRIPTION_IDLE_DURATION = "subscription.idle-duration";
public static final String SUBSCRIPTION_FILTERED_OUT = "subscription.filtered-out";
public static final String SUBSCRIPTION_HTTP_STATUS_CODES = "subscription.http-status-codes";
public static final String SUBSCRIPTION_TIMEOUTS = "subscription.timeouts";
public static final String SUBSCRIPTION_OTHER_ERRORS = "subscription.other-errors";
public static final String SUBSCRIPTION_FAILURES = "subscription.failures";
public static final String SUBSCRIPTION_INFLIGHT_TIME = "subscription.inflight-time-seconds";
}

}
Loading

0 comments on commit 80ee439

Please sign in to comment.