diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java index 0c1b65b913..70d3295cc0 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/ProducerMetrics.java @@ -2,6 +2,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; +import pl.allegro.tech.hermes.metrics.HermesTimer; import java.util.concurrent.TimeUnit; import java.util.function.ToDoubleFunction; @@ -87,6 +88,12 @@ public void registerProducerInflightRequestGauge(T stateObj, ToDoubleFunctio meterRegistry.gauge(INFLIGHT_REQUESTS, stateObj, f); } + public HermesTimer sendLatency(String sender, String datacenter) { + return HermesTimer.from( + meterRegistry.timer("kafka-producer.send-latency", tags(sender, datacenter)) + ); + } + private static Tags tags(String sender, String datacenter) { return Tags.of("storageDc", datacenter, "sender", sender); } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java index 6e00173e48..af4a519718 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java @@ -18,6 +18,7 @@ import pl.allegro.tech.hermes.frontend.producer.BrokerLatencyReporter; import pl.allegro.tech.hermes.frontend.publishing.message.Message; import pl.allegro.tech.hermes.frontend.publishing.metadata.ProduceMetadata; +import pl.allegro.tech.hermes.metrics.HermesTimer; import pl.allegro.tech.hermes.metrics.HermesTimerContext; import java.util.Collections; @@ -40,17 +41,20 @@ public class KafkaMessageSender { private final MetricsFacade metricsFacade; private final String datacenter; private final ScheduledExecutorService chaosScheduler; + private final HermesTimer sendTimer; KafkaMessageSender(Producer kafkaProducer, BrokerLatencyReporter brokerLatencyReporter, MetricsFacade metricsFacade, String datacenter, - ScheduledExecutorService chaosScheduler) { + ScheduledExecutorService chaosScheduler, + String senderName) { this.producer = kafkaProducer; this.brokerLatencyReporter = brokerLatencyReporter; this.metricsFacade = metricsFacade; this.datacenter = datacenter; this.chaosScheduler = chaosScheduler; + this.sendTimer = metricsFacade.producer().sendLatency(senderName, datacenter); } public String getDatacenter() { @@ -87,9 +91,10 @@ public void send(ProducerRecord producerRecord, Callback callback) { HermesTimerContext timer = cachedTopic.startBrokerLatencyTimer(); Callback meteredCallback = new MeteredCallback(timer, message, cachedTopic, callback); - try { + try (HermesTimerContext ignored = sendTimer.time()) { producer.send(producerRecord, meteredCallback); } catch (Exception e) { + logger.debug("Error while sending message to Kafka.", e); callback.onCompletion(exceptionalRecordMetadata(cachedTopic), e); } } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSendersFactory.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSendersFactory.java index cd10168f3c..237a370cf9 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSendersFactory.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSendersFactory.java @@ -80,14 +80,14 @@ public KafkaMessageSenders provide(KafkaProducerParameters localKafkaProducerPar KafkaProducerParameters remoteKafkaProducerParameters, String senderName) { KafkaMessageSenders.Tuple localProducers = new KafkaMessageSenders.Tuple( - sender(kafkaParameters, localKafkaProducerParameters, ACK_LEADER), - sender(kafkaParameters, localKafkaProducerParameters, ACK_ALL) + sender(kafkaParameters, localKafkaProducerParameters, ACK_LEADER, senderName), + sender(kafkaParameters, localKafkaProducerParameters, ACK_ALL, senderName) ); List remoteProducers = remoteKafkaParameters.stream().map( kafkaProperties -> new KafkaMessageSenders.Tuple( - sender(kafkaProperties, remoteKafkaProducerParameters, ACK_LEADER), - sender(kafkaProperties, remoteKafkaProducerParameters, ACK_ALL))).toList(); + sender(kafkaProperties, remoteKafkaProducerParameters, ACK_LEADER, senderName), + sender(kafkaProperties, remoteKafkaProducerParameters, ACK_ALL, senderName))).toList(); KafkaMessageSenders senders = new KafkaMessageSenders( topicMetadataLoadingExecutor, localMinInSyncReplicasLoader, @@ -100,7 +100,8 @@ public KafkaMessageSenders provide(KafkaProducerParameters localKafkaProducerPar private KafkaMessageSender sender(KafkaParameters kafkaParameters, KafkaProducerParameters kafkaProducerParameters, - String acks) { + String acks, + String senderName) { Map props = new HashMap<>(); props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.getBrokerList()); props.put(MAX_BLOCK_MS_CONFIG, (int) kafkaProducerParameters.getMaxBlock().toMillis()); @@ -133,7 +134,8 @@ private KafkaMessageSender sender(KafkaParameters kafkaParameter brokerLatencyReporter, metricsFacade, kafkaParameters.getDatacenter(), - chaosScheduler + chaosScheduler, + senderName ); } diff --git a/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/LocalDatacenterMessageProducerIntegrationTest.groovy b/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/LocalDatacenterMessageProducerIntegrationTest.groovy index 46b1913121..551ddae82d 100644 --- a/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/LocalDatacenterMessageProducerIntegrationTest.groovy +++ b/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/LocalDatacenterMessageProducerIntegrationTest.groovy @@ -111,8 +111,12 @@ class LocalDatacenterMessageProducerIntegrationTest extends Specification { topicMetadataLoadingExecutor, minInSyncReplicasLoader, new KafkaMessageSenders.Tuple( - new KafkaMessageSender(leaderConfirms, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler), - new KafkaMessageSender(everyoneConfirms, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler) + new KafkaMessageSender( + leaderConfirms, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler, "local-sender" + ), + new KafkaMessageSender( + everyoneConfirms, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler, "local-sender" + ) ), emptyList() ) diff --git a/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/producer/kafka/LocalDatacenterMessageProducerTest.java b/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/producer/kafka/LocalDatacenterMessageProducerTest.java index 3866830892..7fb88ebafc 100644 --- a/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/producer/kafka/LocalDatacenterMessageProducerTest.java +++ b/hermes-frontend/src/test/java/pl/allegro/tech/hermes/frontend/producer/kafka/LocalDatacenterMessageProducerTest.java @@ -60,8 +60,12 @@ public class LocalDatacenterMessageProducerTest { private final MockProducer leaderConfirmsProducer = new MockProducer<>(true, serializer, serializer); private final MockProducer everyoneConfirmProducer = new MockProducer<>(true, serializer, serializer); - private final KafkaMessageSender leaderConfirmsProduceWrapper = new KafkaMessageSender<>(leaderConfirmsProducer, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler); - private final KafkaMessageSender everyoneConfirmsProduceWrapper = new KafkaMessageSender<>(everyoneConfirmProducer, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler); + private final KafkaMessageSender leaderConfirmsProduceWrapper = new KafkaMessageSender<>( + leaderConfirmsProducer, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler, "local-sender" + ); + private final KafkaMessageSender everyoneConfirmsProduceWrapper = new KafkaMessageSender<>( + everyoneConfirmProducer, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler, "local-sender" + ); private final KafkaHeaderNameProperties kafkaHeaderNameProperties = new KafkaHeaderNameProperties(); private final HTTPHeadersPropagationAsKafkaHeadersProperties httpHeadersPropagationAsKafkaHeadersProperties =