Skip to content

Commit

Permalink
add broker latency metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky committed Jan 17, 2024
1 parent 17d5b40 commit 3213615
Show file tree
Hide file tree
Showing 17 changed files with 357 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimite
throughputLimiter,
null,
false,
handlersChainProperties
handlersChainProperties, null
).provide();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;

import java.time.Duration;

public class BrokerMetrics {
private final MeterRegistry meterRegistry;

public BrokerMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

public void recordBrokerLatency(String broker, Duration duration) {
Timer.builder("broker.latency")
.tag("broker", broker)
.publishPercentileHistogram()
.maximumExpectedValue(Duration.ofSeconds(5))
.register(meterRegistry)
.record(duration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class MetricsFacade {
private final ConsumerSenderMetrics consumerSenderMetrics;
private final OffsetCommitsMetrics offsetCommitsMetrics;
private final MaxRateMetrics maxRateMetrics;
private final BrokerMetrics brokerMetrics;

public MetricsFacade(MeterRegistry meterRegistry, HermesMetrics hermesMetrics) {
this.meterRegistry = meterRegistry;
Expand All @@ -60,6 +61,7 @@ public MetricsFacade(MeterRegistry meterRegistry, HermesMetrics hermesMetrics) {
this.consumerSenderMetrics = new ConsumerSenderMetrics(hermesMetrics, meterRegistry);
this.offsetCommitsMetrics = new OffsetCommitsMetrics(hermesMetrics, meterRegistry);
this.maxRateMetrics = new MaxRateMetrics(hermesMetrics, meterRegistry);
this.brokerMetrics = new BrokerMetrics(meterRegistry);
}

public TopicMetrics topics() {
Expand Down Expand Up @@ -118,6 +120,10 @@ public MaxRateMetrics maxRate() {
return maxRateMetrics;
}

public BrokerMetrics broker() {
return brokerMetrics;
}

public void unregisterAllMetricsRelatedTo(SubscriptionName subscription) {
Collection<Meter> meters = Search.in(meterRegistry)
.tags(subscriptionTags(subscription))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class InstrumentedExecutorServiceFactory {

private final ThreadPoolMetrics threadPoolMetrics;
Expand All @@ -21,8 +22,12 @@ public InstrumentedExecutorServiceFactory(ThreadPoolMetrics threadPoolMetrics) {
}

public ExecutorService getExecutorService(String name, int size, boolean monitoringEnabled) {
return getExecutorService(name, size, monitoringEnabled, Integer.MAX_VALUE);
}

public ExecutorService getExecutorService(String name, int size, boolean monitoringEnabled, int queueCapacity) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(name + "-executor-%d").build();
ThreadPoolExecutor executor = newFixedThreadPool(name, size, threadFactory);
ThreadPoolExecutor executor = newFixedThreadPool(name, size, threadFactory, queueCapacity);
executor.prestartAllCoreThreads();

if (monitoringEnabled) {
Expand Down Expand Up @@ -51,23 +56,21 @@ private void monitor(String threadPoolName, ThreadPoolExecutor executor) {
threadPoolMetrics.createGauges(threadPoolName, executor);
}


/**
* Copy of {@link java.util.concurrent.Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory)}.
* Copy of {@link java.util.concurrent.Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory)}
* with configurable queue capacity.
*/
private ThreadPoolExecutor newFixedThreadPool(String executorName, int size, ThreadFactory threadFactory) {
private ThreadPoolExecutor newFixedThreadPool(String executorName, int size, ThreadFactory threadFactory, int queueCapacity) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
size,
size,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new LinkedBlockingQueue<>(queueCapacity),
threadFactory,
getMeteredRejectedExecutionHandler(executorName)
);

return executor;

}

RejectedExecutionHandler getMeteredRejectedExecutionHandler(String executorName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package pl.allegro.tech.hermes.frontend.config;

import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.common.metric.executor.InstrumentedExecutorServiceFactory;
import pl.allegro.tech.hermes.frontend.producer.BrokerLatencyReporter;

import java.util.concurrent.ExecutorService;


@Configuration
@EnableConfigurationProperties(BrokerLatencyReporterProperties.class)
public class BrokerLatencyReporterConfiguration {

@Bean
BrokerLatencyReporter brokerLatencyReporter(BrokerLatencyReporterProperties properties,
MetricsFacade metricsFacade,
InstrumentedExecutorServiceFactory executorServiceFactory) {
ExecutorService executorService = executorServiceFactory.getExecutorService(
"broker-latency-reporter",
8,
true,
1_000_000
);

return new BrokerLatencyReporter(
properties.isEnabled(),
metricsFacade,
properties.getSlowResponseLoggingThreshold(),
executorService
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package pl.allegro.tech.hermes.frontend.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

import java.time.Duration;

@ConfigurationProperties(prefix = "frontend.broker-latency-reporter")
public class BrokerLatencyReporterProperties {
private boolean enabled;
private Duration slowResponseLoggingThreshold = Duration.ofMillis(100);


public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public Duration getSlowResponseLoggingThreshold() {
return slowResponseLoggingThreshold;
}

public void setSlowResponseLoggingThreshold(Duration slowResponseLoggingThreshold) {
this.slowResponseLoggingThreshold = slowResponseLoggingThreshold;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pl.allegro.tech.hermes.domain.topic.preview.MessagePreviewRepository;
import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache;
import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners;
import pl.allegro.tech.hermes.frontend.producer.BrokerLatencyReporter;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.publishing.handlers.HandlersChainFactory;
import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter;
Expand Down Expand Up @@ -48,10 +49,11 @@ public HttpHandler httpHandler(TopicsCache topicsCache, MessageErrorProcessor me
MessageEndProcessor messageEndProcessor, MessageFactory messageFactory,
BrokerMessageProducer brokerMessageProducer, MessagePreviewLog messagePreviewLog,
ThroughputLimiter throughputLimiter, Optional<AuthenticationConfiguration> authConfig,
MessagePreviewProperties messagePreviewProperties, HandlersChainProperties handlersChainProperties) {
MessagePreviewProperties messagePreviewProperties, HandlersChainProperties handlersChainProperties,
BrokerLatencyReporter brokerLatencyReporter) {
return new HandlersChainFactory(topicsCache, messageErrorProcessor, messageEndProcessor, messageFactory,
brokerMessageProducer, messagePreviewLog, throughputLimiter, authConfig, messagePreviewProperties.isEnabled(),
handlersChainProperties).provide();
handlersChainProperties, brokerLatencyReporter).provide();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package pl.allegro.tech.hermes.frontend.producer;

import jakarta.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.frontend.publishing.message.Message;
import pl.allegro.tech.hermes.frontend.publishing.metadata.ProduceMetadata;
import pl.allegro.tech.hermes.metrics.HermesTimerContext;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

public class BrokerLatencyReporter {

private static final Logger logger = LoggerFactory.getLogger(BrokerLatencyReporter.class);

private final boolean perBrokerLatencyEnabled;
private final MetricsFacade metricsFacade;
private final Duration slowResponseThreshold;
private final ExecutorService reporterExecutorService;

public BrokerLatencyReporter(boolean perBrokerLatencyEnabled,
MetricsFacade metricsFacade,
Duration slowResponseThreshold,
ExecutorService reporterExecutorService) {
this.perBrokerLatencyEnabled = perBrokerLatencyEnabled;
this.metricsFacade = metricsFacade;
this.slowResponseThreshold = slowResponseThreshold;
this.reporterExecutorService = reporterExecutorService;
}

public void report(HermesTimerContext timerContext,
Message message,
Topic.Ack ack,
@Nullable Supplier<ProduceMetadata> produceMetadata) {
Duration duration = timerContext.closeAndGet();
if (perBrokerLatencyEnabled) {
reporterExecutorService.submit(() -> doReport(duration, message.getId(), ack, produceMetadata));
}

}

private void doReport(Duration duration,
String messageId,
Topic.Ack ack,
@Nullable Supplier<ProduceMetadata> produceMetadata) {
String broker = Optional.ofNullable(produceMetadata).flatMap(metadata -> metadata.get().getBroker()).orElse("unknown");

if (duration.compareTo(slowResponseThreshold) > 0) {
logger.info("Slow produce request, broker response time: {} ms, ackLevel: {}, messageId: {}, broker: {}",
duration.toMillis(), ack, messageId, broker);
}

metricsFacade.broker().recordBrokerLatency(broker, duration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.frontend.metric.CachedTopic;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.publishing.PublishingCallback;
import pl.allegro.tech.hermes.frontend.publishing.message.Message;
import pl.allegro.tech.hermes.frontend.publishing.metadata.ProduceMetadata;

import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

@Singleton
public class KafkaBrokerMessageProducer implements BrokerMessageProducer {
Expand Down Expand Up @@ -41,7 +44,7 @@ public void send(Message message, CachedTopic cachedTopic, final PublishingCallb
messageConverter.convertToProducerRecord(message, cachedTopic.getKafkaTopics().getPrimary().name());

try {
producers.get(cachedTopic.getTopic()).send(producerRecord, new SendCallback(message, cachedTopic.getTopic(), callback));
producers.get(cachedTopic.getTopic()).send(producerRecord, new SendCallback(message, cachedTopic, callback));
} catch (Exception e) {
// message didn't get to internal producer buffer and it will not be send to a broker
callback.onUnpublished(message, cachedTopic.getTopic(), e);
Expand Down Expand Up @@ -74,6 +77,28 @@ public boolean isTopicAvailable(CachedTopic cachedTopic) {
return false;
}

private Supplier<ProduceMetadata> produceMetadataSupplier(CachedTopic topic, RecordMetadata recordMetadata) {
return () -> {
String kafkaTopicName = topic.getKafkaTopics().getPrimary().name().asString();
try {
List<PartitionInfo> topicPartitions = producers.get(topic.getTopic()).partitionsFor(kafkaTopicName);

Optional<PartitionInfo> partitionInfo = topicPartitions.stream()
.filter(p -> p.partition() == recordMetadata.partition())
.findFirst();

return partitionInfo.map(partition -> partition.leader().host())
.map(ProduceMetadata::new)
.orElse(ProduceMetadata.empty());
} catch (InterruptException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.warn("Could not read information about partitions for topic {}. {}", kafkaTopicName, e.getMessage());
}
return ProduceMetadata.empty();
};
}

private boolean anyPartitionWithoutLeader(List<PartitionInfo> partitionInfos) {
return partitionInfos.stream().anyMatch(p -> p.leader() == null);
}
Expand All @@ -86,22 +111,23 @@ private boolean anyUnderReplicatedPartition(List<PartitionInfo> partitionInfos,
private class SendCallback implements org.apache.kafka.clients.producer.Callback {

private final Message message;
private final Topic topic;
private final CachedTopic topic;
private final PublishingCallback callback;

public SendCallback(Message message, Topic topic, PublishingCallback callback) {
public SendCallback(Message message, CachedTopic topic, PublishingCallback callback) {
this.message = message;
this.topic = topic;
this.callback = callback;
}

@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
Supplier<ProduceMetadata> produceMetadata = produceMetadataSupplier(topic, recordMetadata);
if (e == null) {
callback.onPublished(message, topic);
callback.onPublished(message, topic.getTopic(), produceMetadata);
producers.maybeRegisterNodeMetricsGauges(metricsFacade);
} else {
callback.onUnpublished(message, topic, e);
callback.onUnpublished(message, topic.getTopic(), produceMetadata, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@

import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.frontend.publishing.message.Message;
import pl.allegro.tech.hermes.frontend.publishing.metadata.ProduceMetadata;

import java.util.function.Supplier;

public interface PublishingCallback {

void onUnpublished(Message message, Topic topic, Exception exception);

default void onUnpublished(Message message, Topic topic, Supplier<ProduceMetadata> produceMetadata, Exception exception) {
onUnpublished(message, topic, exception);
}

void onPublished(Message message, Topic topic);

default void onPublished(Message message, Topic topic, Supplier<ProduceMetadata> produceMetadata) {
onPublished(message, topic);
}
}
Loading

0 comments on commit 3213615

Please sign in to comment.