Skip to content

Commit

Permalink
track each delivered message
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky committed Feb 29, 2024
1 parent b55c904 commit c295537
Show file tree
Hide file tree
Showing 21 changed files with 197 additions and 108 deletions.
13 changes: 3 additions & 10 deletions hermes-api/src/main/java/pl/allegro/tech/hermes/api/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,14 @@ public class Topic {
private Set<TopicLabel> labels;
private Instant createdAt;
private Instant modifiedAt;
private boolean buffersDisabled = false;

public Topic(TopicName name, String description, OwnerId owner, RetentionTime retentionTime,
boolean migratedFromJsonType, Ack ack, boolean fallbackToRemoteDatacenterEnabled,
boolean trackingEnabled, ContentType contentType, boolean jsonToAvroDryRunEnabled,
@JacksonInject(value = DEFAULT_SCHEMA_ID_SERIALIZATION_ENABLED_KEY, useInput = OptBoolean.TRUE)
Boolean schemaIdAwareSerializationEnabled,
int maxMessageSize, PublishingAuth publishingAuth, boolean subscribingRestricted,
TopicDataOfflineStorage offlineStorage, Set<TopicLabel> labels, Instant createdAt, Instant modifiedAt, boolean buffersDisabled) {
TopicDataOfflineStorage offlineStorage, Set<TopicLabel> labels, Instant createdAt, Instant modifiedAt) {
this.name = name;
this.description = description;
this.owner = owner;
Expand All @@ -82,7 +81,6 @@ public Topic(TopicName name, String description, OwnerId owner, RetentionTime re
this.labels = labels;
this.createdAt = createdAt;
this.modifiedAt = modifiedAt;
this.buffersDisabled = buffersDisabled;
}

@JsonCreator
Expand All @@ -106,8 +104,7 @@ public Topic(
@JsonProperty("offlineStorage") TopicDataOfflineStorage offlineStorage,
@JsonProperty("labels") Set<TopicLabel> labels,
@JsonProperty("createdAt") Instant createdAt,
@JsonProperty("modifiedAt") Instant modifiedAt,
@JsonProperty("buffersDisabled") boolean buffersDisabled
@JsonProperty("modifiedAt") Instant modifiedAt
) {
this(TopicName.fromQualifiedName(qualifiedName), description, owner, retentionTime, migratedFromJsonType, ack,
fallbackToRemoteDatacenterEnabled, trackingEnabled, contentType, jsonToAvroDryRunEnabled,
Expand All @@ -116,7 +113,7 @@ public Topic(
subscribingRestricted,
offlineStorage == null ? TopicDataOfflineStorage.defaultOfflineStorage() : offlineStorage,
labels == null ? Collections.emptySet() : labels,
createdAt, modifiedAt, buffersDisabled
createdAt, modifiedAt
);
}

Expand Down Expand Up @@ -263,10 +260,6 @@ public void setModifiedAt(Long modifiedAt) {
this.modifiedAt = Instant.ofEpochMilli(modifiedAt);
}

public boolean isBuffersDisabled() {
return buffersDisabled;
}

@Override
public String toString() {
return "Topic(" + getQualifiedName() + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class Meters {
public static final String DELAYED_PROCESSING = "delayed-processing";
public static final String TOPIC_DELAYED_PROCESSING = DELAYED_PROCESSING + "." + GROUP + "." + TOPIC;

public static final String TOPIC_DUPLICATED_MESSAGE = "duplicated-message" + "." + GROUP + "." + TOPIC;

public static final String OAUTH_SUBSCRIPTION_TOKEN_REQUEST = "oauth.subscription." + GROUP + "." + TOPIC + "." + SUBSCRIPTION
+ ".token-request." + OAUTH_PROVIDER_NAME;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ public HermesCounter topicHttpStatusCodeCounter(TopicName topicName, int statusC
);
}

public HermesCounter topicDuplicatedMessageCounter(TopicName topicName) {
return HermesCounters.from(
micrometerCounter(TopicMetricsNames.TOPIC_DUPLICATED_MESSAGE, topicName),
hermesMetrics.meter(Meters.TOPIC_DUPLICATED_MESSAGE, topicName)
);
}

public HermesHistogram topicGlobalMessageContentSizeHistogram() {
return DefaultHermesHistogram.of(
DistributionSummary.builder(TopicMetricsNames.TOPIC_GLOBAL_MESSAGE_SIZE_BYTES)
Expand Down Expand Up @@ -176,5 +183,6 @@ public static class TopicMetricsNames {
public static final String TOPIC_HTTP_STATUS_CODES = "topic-http-status-codes";
public static final String TOPIC_GLOBAL_MESSAGE_SIZE_BYTES = "topic-global-message-size-bytes";
public static final String TOPIC_MESSAGE_SIZE_BYTES = "topic-message-size-bytes";
public static final String TOPIC_DUPLICATED_MESSAGE = "topic-duplicated-message";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ public void onPublished(Message message, Topic topic) {
brokerTimer.close();
cachedTopic.incrementPublished();
brokerListeners.onAcknowledge(message, topic);
}

@Override
public void onEachPublished(Message message, Topic topic, String datacenter) {
trackers.get(topic).logPublished(message.getId(), topic.getName(), "", Collections.emptyMap());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ public void onUnpublished(Message message, Topic topic, Exception exception) {
public void onPublished(Message message, Topic topic) {
callbacks.forEach(c -> c.onPublished(message, topic));
}

@Override
public void onEachPublished(Message message, Topic topic, String datacenter) {
callbacks.forEach(c -> c.onEachPublished(message, topic, datacenter));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ public BrokerMessageProducer bufferedMessageBrokerProducer(
@Bean
public BrokerMessageProducer unbufferedMessageBrokerProducer(
@Named("experimentalKafkaMessageProducer") Producers producers,
MetricsFacade metricsFacade,
MessageToKafkaProducerRecordConverter messageConverter,
ExperimentalKafkaProducerProperties kafkaProducerProperties
) {
return new MultiDCKafkaBrokerMessageProducer(producers, new SimpleRemoteProducerProvider(), metricsFacade, messageConverter, kafkaProducerProperties.getSpeculativeSendDelay());
return new MultiDCKafkaBrokerMessageProducer(producers, new SimpleRemoteProducerProvider(), messageConverter, kafkaProducerProperties.getSpeculativeSendDelay());
}

@Bean
Expand All @@ -83,7 +82,8 @@ public Producers kafkaMessageProducer(KafkaClustersProperties kafkaClustersPrope
remoteKafkaProperties,
kafkaProducerProperties,
brokerLatencyReporter,
localMessageStorageProperties.getBufferedSizeBytes()
localMessageStorageProperties.getBufferedSizeBytes(),
datacenterNameProvider.getDatacenterName()
).provide();

}
Expand All @@ -100,7 +100,9 @@ public Producers experimentalKafkaMessageProducer(KafkaClustersProperties kafkaC
remoteKafkaProperties,
kafkaProducerProperties,
brokerLatencyReporter,
localMessageStorageProperties.getBufferedSizeBytes()).provide();
localMessageStorageProperties.getBufferedSizeBytes(),
datacenterNameProvider.getDatacenterName()
).provide();

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class CachedTopic {
private final MeterBackedHermesCounter topicThroughputMeter;
private final MeterBackedHermesCounter globalThroughputMeter;

private final HermesCounter topicDuplicatedMessageCounter;

private final HermesCounter published;

private final Map<Integer, MetersPair> httpStatusCodesMeters = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -78,6 +80,8 @@ public CachedTopic(Topic topic, MetricsFacade metricsFacade,
topicProducerLatencyTimer = metricsFacade.topics().ackLeaderTopicLatency(topic.getName());
topicBrokerLatencyTimer = metricsFacade.topics().ackLeaderBrokerLatency();
}

topicDuplicatedMessageCounter = metricsFacade.topics().topicDuplicatedMessageCounter(topic.getName());
}

public Topic getTopic() {
Expand Down Expand Up @@ -141,4 +145,8 @@ public void markDelayedProcessing() {
public HermesRateMeter getThroughput() {
return topicThroughputMeter;
}

public void markMessageDuplicated() {
topicDuplicatedMessageCounter.increment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public BufferAwareBrokerMessageProducer(BrokerMessageProducer bufferedBrokerMess

@Override
public void send(Message message, CachedTopic topic, PublishingCallback callback) {
if (topic.getTopic().isBuffersDisabled()) {
if (topic.getTopic().isFallbackToRemoteDatacenterEnabled()) {
this.unbufferedBrokerMessageProducer.send(message, topic, callback);
} else {
this.bufferedBrokerMessageProducer.send(message, topic, callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,9 @@ public void send(Message message, CachedTopic cachedTopic, final PublishingCallb
messageConverter.convertToProducerRecord(message, cachedTopic.getKafkaTopics().getPrimary().name());

try {
Callback wrappedCallback = new SendCallback(message, cachedTopic, callback);
producers.get(cachedTopic.getTopic()).send(
producerRecord,
cachedTopic,
message,
wrappedCallback
);
var producer = producers.get(cachedTopic.getTopic());
Callback wrappedCallback = new SendCallback(message, cachedTopic, callback, producer.getDatacenter());
producer.send(producerRecord, cachedTopic, message, wrappedCallback);
} catch (Exception e) {
// message didn't get to internal producer buffer and it will not be send to a broker
callback.onUnpublished(message, cachedTopic.getTopic(), e);
Expand Down Expand Up @@ -93,16 +89,19 @@ private class SendCallback implements org.apache.kafka.clients.producer.Callback
private final Message message;
private final CachedTopic topic;
private final PublishingCallback callback;
private final String datacenter;

public SendCallback(Message message, CachedTopic topic, PublishingCallback callback) {
public SendCallback(Message message, CachedTopic topic, PublishingCallback callback, String datacenter) {
this.message = message;
this.topic = topic;
this.callback = callback;
this.datacenter = datacenter;
}

@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
callback.onEachPublished(message, topic.getTopic(), datacenter);
callback.onPublished(message, topic.getTopic());
} else {
callback.onUnpublished(message, topic.getTopic(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,19 @@ public class KafkaMessageProducerFactory {
private final KafkaProducerParameters kafkaProducerParameters;
private final BrokerLatencyReporter brokerLatencyReporter;
private final long bufferedSizeBytes;
private final String datacenter;

public KafkaMessageProducerFactory(KafkaParameters kafkaParameters,
List<KafkaProperties> remoteKafkaParameters,
KafkaProducerParameters kafkaProducerParameters, BrokerLatencyReporter brokerLatencyReporter,
long bufferedSizeBytes) {
long bufferedSizeBytes,
String datacenter) {
this.kafkaProducerParameters = kafkaProducerParameters;
this.brokerLatencyReporter = brokerLatencyReporter;
this.bufferedSizeBytes = bufferedSizeBytes;
this.kafkaParameters = kafkaParameters;
this.remoteKafkaParameters = remoteKafkaParameters;
this.datacenter = datacenter;

}

Expand Down Expand Up @@ -97,7 +100,8 @@ private KafkaProducer<byte[], byte[]> producer(KafkaParameters kafkaParameters,
}
return new KafkaProducer<>(
new org.apache.kafka.clients.producer.KafkaProducer<>(props),
brokerLatencyReporter
brokerLatencyReporter,
datacenter
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,74 +29,92 @@ public class KafkaProducer<K, V> implements Producer<K, V> {

private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

private final org.apache.kafka.clients.producer.Producer<K, V> producer;
private final BrokerLatencyReporter brokerLatencyReporter;
private final org.apache.kafka.clients.producer.Producer<K,V> producer;
private final String datacenter;

public KafkaProducer(org.apache.kafka.clients.producer.Producer<K, V> kafkaProducer, BrokerLatencyReporter brokerLatencyReporter) {
public KafkaProducer(org.apache.kafka.clients.producer.Producer<K, V> kafkaProducer, BrokerLatencyReporter brokerLatencyReporter, String datacenter) {
this.producer = kafkaProducer;
this.brokerLatencyReporter = brokerLatencyReporter;
this.datacenter = datacenter;
}

public String getDatacenter() {
return datacenter;
}

public void send(ProducerRecord<K, V> producerRecord,
CachedTopic cachedTopic,
Message message,
Callback callback
) {
) {
HermesTimerContext timer = cachedTopic.startBrokerLatencyTimer();
Callback meteredCallback = new MeteredCallback(timer, message, cachedTopic, callback);
producer.send(producerRecord, meteredCallback);
}

@Override
public Future<RecordMetadata> send(ProducerRecord<K,V> record) {
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return producer.send(record);
}

@Override
public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) {
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
return producer.send(record, callback);
}

@Override
public void initTransactions() {
producer.initTransactions();
}

@Override
public void beginTransaction() throws ProducerFencedException {
producer.beginTransaction();
}

@Override
public void sendOffsetsToTransaction(Map offsets, String consumerGroupId) throws ProducerFencedException {
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
}

@Override
public void sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata groupMetadata) throws ProducerFencedException {
producer.sendOffsetsToTransaction(offsets, groupMetadata);
}

@Override
public void commitTransaction() throws ProducerFencedException {
producer.commitTransaction();
}

@Override
public void abortTransaction() throws ProducerFencedException {
producer.abortTransaction();
}

@Override
public void flush() {
producer.flush();
}


@Override
public List<PartitionInfo> partitionsFor(String topic) {
return producer.partitionsFor(topic);
}

@Override
public Map<MetricName, ? extends Metric> metrics() {
return producer.metrics();
}

@Override
public void close() {
producer.close();
}

@Override
public void close(Duration timeout) {
producer.close(timeout);
}
Expand Down
Loading

0 comments on commit c295537

Please sign in to comment.