Skip to content

Commit

Permalink
Measure send latency
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrrzysko committed Jun 20, 2024
1 parent a478894 commit f594fbe
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import pl.allegro.tech.hermes.metrics.HermesTimer;

import java.util.concurrent.TimeUnit;
import java.util.function.ToDoubleFunction;
Expand Down Expand Up @@ -87,6 +88,12 @@ public <T> void registerProducerInflightRequestGauge(T stateObj, ToDoubleFunctio
meterRegistry.gauge(INFLIGHT_REQUESTS, stateObj, f);
}

public HermesTimer sendLatency(String sender, String datacenter) {
return HermesTimer.from(
meterRegistry.timer("kafka-producer.send-latency", tags(sender, datacenter))
);
}

private static Tags tags(String sender, String datacenter) {
return Tags.of("storageDc", datacenter, "sender", sender);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pl.allegro.tech.hermes.frontend.producer.BrokerLatencyReporter;
import pl.allegro.tech.hermes.frontend.publishing.message.Message;
import pl.allegro.tech.hermes.frontend.publishing.metadata.ProduceMetadata;
import pl.allegro.tech.hermes.metrics.HermesTimer;
import pl.allegro.tech.hermes.metrics.HermesTimerContext;

import java.util.Collections;
Expand All @@ -40,17 +41,20 @@ public class KafkaMessageSender<K, V> {
private final MetricsFacade metricsFacade;
private final String datacenter;
private final ScheduledExecutorService chaosScheduler;
private final HermesTimer sendTimer;

KafkaMessageSender(Producer<K, V> kafkaProducer,
BrokerLatencyReporter brokerLatencyReporter,
MetricsFacade metricsFacade,
String datacenter,
ScheduledExecutorService chaosScheduler) {
ScheduledExecutorService chaosScheduler,
String senderName) {
this.producer = kafkaProducer;
this.brokerLatencyReporter = brokerLatencyReporter;
this.metricsFacade = metricsFacade;
this.datacenter = datacenter;
this.chaosScheduler = chaosScheduler;
this.sendTimer = metricsFacade.producer().sendLatency(senderName, datacenter);
}

public String getDatacenter() {
Expand Down Expand Up @@ -87,9 +91,10 @@ public void send(ProducerRecord<K, V> producerRecord,
Callback callback) {
HermesTimerContext timer = cachedTopic.startBrokerLatencyTimer();
Callback meteredCallback = new MeteredCallback(timer, message, cachedTopic, callback);
try {
try (HermesTimerContext ignored = sendTimer.time()) {
producer.send(producerRecord, meteredCallback);
} catch (Exception e) {
logger.debug("Error while sending message to Kafka.", e);
callback.onCompletion(exceptionalRecordMetadata(cachedTopic), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ public KafkaMessageSenders provide(KafkaProducerParameters localKafkaProducerPar
KafkaProducerParameters remoteKafkaProducerParameters,
String senderName) {
KafkaMessageSenders.Tuple localProducers = new KafkaMessageSenders.Tuple(
sender(kafkaParameters, localKafkaProducerParameters, ACK_LEADER),
sender(kafkaParameters, localKafkaProducerParameters, ACK_ALL)
sender(kafkaParameters, localKafkaProducerParameters, ACK_LEADER, senderName),
sender(kafkaParameters, localKafkaProducerParameters, ACK_ALL, senderName)
);

List<KafkaMessageSenders.Tuple> remoteProducers = remoteKafkaParameters.stream().map(
kafkaProperties -> new KafkaMessageSenders.Tuple(
sender(kafkaProperties, remoteKafkaProducerParameters, ACK_LEADER),
sender(kafkaProperties, remoteKafkaProducerParameters, ACK_ALL))).toList();
sender(kafkaProperties, remoteKafkaProducerParameters, ACK_LEADER, senderName),
sender(kafkaProperties, remoteKafkaProducerParameters, ACK_ALL, senderName))).toList();
KafkaMessageSenders senders = new KafkaMessageSenders(
topicMetadataLoadingExecutor,
localMinInSyncReplicasLoader,
Expand All @@ -100,7 +100,8 @@ public KafkaMessageSenders provide(KafkaProducerParameters localKafkaProducerPar

private KafkaMessageSender<byte[], byte[]> sender(KafkaParameters kafkaParameters,
KafkaProducerParameters kafkaProducerParameters,
String acks) {
String acks,
String senderName) {
Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.getBrokerList());
props.put(MAX_BLOCK_MS_CONFIG, (int) kafkaProducerParameters.getMaxBlock().toMillis());
Expand Down Expand Up @@ -133,7 +134,8 @@ private KafkaMessageSender<byte[], byte[]> sender(KafkaParameters kafkaParameter
brokerLatencyReporter,
metricsFacade,
kafkaParameters.getDatacenter(),
chaosScheduler
chaosScheduler,
senderName
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,12 @@ class LocalDatacenterMessageProducerIntegrationTest extends Specification {
topicMetadataLoadingExecutor,
minInSyncReplicasLoader,
new KafkaMessageSenders.Tuple(
new KafkaMessageSender<byte[], byte[]>(leaderConfirms, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler),
new KafkaMessageSender<byte[], byte[]>(everyoneConfirms, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler)
new KafkaMessageSender<byte[], byte[]>(
leaderConfirms, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler, "local-sender"
),
new KafkaMessageSender<byte[], byte[]>(
everyoneConfirms, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler, "local-sender"
)
),
emptyList()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ public class LocalDatacenterMessageProducerTest {

private final MockProducer<byte[], byte[]> leaderConfirmsProducer = new MockProducer<>(true, serializer, serializer);
private final MockProducer<byte[], byte[]> everyoneConfirmProducer = new MockProducer<>(true, serializer, serializer);
private final KafkaMessageSender<byte[], byte[]> leaderConfirmsProduceWrapper = new KafkaMessageSender<>(leaderConfirmsProducer, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler);
private final KafkaMessageSender<byte[], byte[]> everyoneConfirmsProduceWrapper = new KafkaMessageSender<>(everyoneConfirmProducer, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler);
private final KafkaMessageSender<byte[], byte[]> leaderConfirmsProduceWrapper = new KafkaMessageSender<>(
leaderConfirmsProducer, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler, "local-sender"
);
private final KafkaMessageSender<byte[], byte[]> everyoneConfirmsProduceWrapper = new KafkaMessageSender<>(
everyoneConfirmProducer, brokerLatencyReporter, metricsFacade, datacenter, chaosScheduler, "local-sender"
);

private final KafkaHeaderNameProperties kafkaHeaderNameProperties = new KafkaHeaderNameProperties();
private final HTTPHeadersPropagationAsKafkaHeadersProperties httpHeadersPropagationAsKafkaHeadersProperties =
Expand Down

0 comments on commit f594fbe

Please sign in to comment.