Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3291 - Possible inconsistency in DLT topic naming convention #3292

Merged
merged 5 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ The framework provides the `DeadLetterPublishingRecoverer`, which publishes the
The recoverer requires a `KafkaTemplate<Object, Object>`, which is used to send the record.
You can also, optionally, configure it with a `BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>`, which is called to resolve the destination topic and partition.

IMPORTANT: By default, the dead-letter record is sent to a topic named `<originalTopic>.DLT` (the original topic name suffixed with `.DLT`) and to the same partition as the original record.
IMPORTANT: By default, the dead-letter record is sent to a topic named `<originalTopic>-dlt` (the original topic name suffixed with `-dlt`) and to the same partition as the original record.
Therefore, when you use the default resolver, the dead-letter topic **must have at least as many partitions as the original topic.**

If the returned `TopicPartition` has a negative partition, the partition is not set in the `ProducerRecord`, so the partition is selected by Kafka.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,13 @@
This section covers the changes made from version 3.2 to version 3.3.
For changes in earlier version, see xref:appendix/change-history.adoc[Change History].


[[x33-dlt-topic-naming]]
=== DLT Topic Naming Convention

The naming convention for DLT topics has been standardized to use the "-dlt" suffix consistently. This change ensures compatibility and avoids conflicts when transitioning between different retry solutions. Users who wish to retain the ".DLT" suffix behavior need to opt-in explicitly by setting the appropriate DLT name property.





Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
*
* @author Gary Russell
* @author Tomaz Fernandes
* @author Watlas R
* @since 2.2
*
*/
Expand All @@ -75,7 +76,7 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR

private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + "-dlt", cr.partition());
Watlas marked this conversation as resolved.
Show resolved Hide resolved

private static final long FIVE = 5L;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -84,7 +84,7 @@
*/
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(kraft = false, partitions = 1, topics = { "blc1", "blc2", "blc3", "blc4", "blc5", "blc6", "blc6.DLT" })
@EmbeddedKafka(kraft = false, partitions = 1, topics = { "blc1", "blc2", "blc3", "blc4", "blc5", "blc6", "blc6-dlt" })
public class BatchListenerConversionTests {

private static final String DEFAULT_TEST_GROUP_ID = "blc";
Expand Down Expand Up @@ -378,7 +378,7 @@ public void listen5(List<Foo> foos,
}
}

@KafkaListener(topics = "blc6.DLT", groupId = "blc6.DLT",
@KafkaListener(topics = "blc6-dlt", groupId = "blc6-dlt",
properties = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG +
":org.apache.kafka.common.serialization.StringDeserializer")
public void listen5Dlt(String in) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ void testTxNoTx() {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
Consumer consumer = mock(Consumer.class);
given(consumer.partitionsFor("foo.DLT", Duration.ofSeconds(5)))
given(consumer.partitionsFor("foo-dlt", Duration.ofSeconds(5)))
.willReturn(Collections.singletonList(new PartitionInfo("foo", 0, null, null, null)));
recoverer.accept(record, consumer, new RuntimeException());
verify(template, never()).executeInTransaction(any());
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
verify(template).send(captor.capture());
assertThat(captor.getValue().partition()).isEqualTo(0);
verify(consumer).partitionsFor("foo.DLT", Duration.ofSeconds(5));
verify(consumer).partitionsFor("foo-dlt", Duration.ofSeconds(5));

record = new ConsumerRecord<>("foo", 1, 0L, "bar", "baz");
recoverer.accept(record, consumer, new RuntimeException());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,11 +61,11 @@ public class DefaultErrorHandlerBatchIntegrationTests {

public static final String topic1 = "dehTopic1";

public static final String topic1DLT = "dehTopic1.DLT";
public static final String topic1DLT = "dehTopic1-dlt";

public static final String topic2 = "dehTopic2";

public static final String topic2DLT = "dehTopic2.DLT";
public static final String topic2DLT = "dehTopic2-dlt";

private static EmbeddedKafkaBroker embeddedKafka;

Expand Down Expand Up @@ -133,7 +133,7 @@ public void recoveryAndDlt() throws Exception {
"baz", "qux", "fiz", "buz",
"qux", "fiz", "buz");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "recoverBatch.dlt");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "recoverBatch-dlt");
DefaultKafkaConsumerFactory<Integer, String> dltcf = new DefaultKafkaConsumerFactory<>(props);
Consumer<Integer, String> consumer = dltcf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic1DLT);
Expand Down Expand Up @@ -215,7 +215,7 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
"baz", "qux", "fiz", "buz",
"qux", "fiz", "buz");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "recoverBatch2.dlt");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "recoverBatch2-dlt");
DefaultKafkaConsumerFactory<Integer, String> dltcf = new DefaultKafkaConsumerFactory<>(props);
Consumer<Integer, String> consumer = dltcf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic2DLT);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,11 +63,11 @@ public class FallbackBatchErrorHandlerIntegrationTests {

public static final String topic1 = "retryTopic1";

public static final String topic1DLT = "retryTopic1.DLT";
public static final String topic1DLT = "retryTopic1-dlt";

public static final String topic2 = "retryTopic2";

public static final String topic2DLT = "retryTopic2.DLT";
public static final String topic2DLT = "retryTopic2-dlt";

private static EmbeddedKafkaBroker embeddedKafka;

Expand Down Expand Up @@ -141,7 +141,7 @@ public void publishEvent(Object event) {
assertThat(recoverLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(failedGroupId.get()).isEqualTo("retryBatch");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "retryBatch.dlt");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "retryBatch-dlt");
DefaultKafkaConsumerFactory<Integer, String> dltcf = new DefaultKafkaConsumerFactory<>(props);
Consumer<Integer, String> consumer = dltcf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic1DLT);
Expand Down Expand Up @@ -219,7 +219,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
assertThat(recoverLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(failedGroupId.get()).isEqualTo("retryBatch2");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "retryBatch2.dlt");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "retryBatch2-dlt");
DefaultKafkaConsumerFactory<Integer, String> dltcf = new DefaultKafkaConsumerFactory<>(props);
Consumer<Integer, String> consumer = dltcf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic2DLT);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -153,7 +153,7 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
assertThat(recoverLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(failedGroupId.get()).isEqualTo("seekTestMaxFailures");

props.put(ConsumerConfig.GROUP_ID_CONFIG, "seekTestMaxFailures.dlt");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "seekTestMaxFailures-dlt");
DefaultKafkaConsumerFactory<Integer, String> dltcf = new DefaultKafkaConsumerFactory<>(props);
Consumer<Integer, String> consumer = dltcf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic1DLT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public class TransactionalContainerTests {

public static final String topic3 = "txTopic3";

public static final String topic3DLT = "txTopic3.DLT";
public static final String topic3DLT = "txTopic3-dlt";

public static final String topic4 = "txTopic4";

Expand All @@ -134,7 +134,7 @@ public class TransactionalContainerTests {

public static final String topic8 = "txTopic8";

public static final String topic8DLT = "txTopic8.DLT";
public static final String topic8DLT = "txTopic8-dlt";

public static final String topic9 = "txTopic9";

Expand Down