Skip to content

Commit

Permalink
Stabilise Kafka Channel Definition Processor tests
Browse files Browse the repository at this point in the history
* Use a larger non blocking delay
* Assert on the consumer record values directly (the error message is slightly better)
  • Loading branch information
filiphr committed Jun 28, 2023
1 parent fe85a72 commit 530b28d
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -1257,17 +1258,18 @@ void exponentialBackOffRetry() throws Exception {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));

assertThat(records.records("exponential-backoff-retry-topic-0"))
.extracting(ConsumerRecord::value)
.satisfiesExactly(
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
+ " name: 'Kermit the Frog'"
+ "}");
},
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'fozzie',"
Expand All @@ -1277,11 +1279,12 @@ record -> {
);

assertThat(records.records("exponential-backoff-retry-topic-1"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand All @@ -1290,11 +1293,12 @@ record -> {
});

assertThat(records.records("exponential-backoff-dlt-topic"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand Down Expand Up @@ -1384,17 +1388,18 @@ void exponentialBackOffRetryWithDelaySuffixing() throws Exception {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));

assertThat(records.records("exponential-backoff-delay-retry-topic-100"))
.extracting(ConsumerRecord::value)
.satisfiesExactly(
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
+ " name: 'Kermit the Frog'"
+ "}");
},
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'fozzie',"
Expand All @@ -1404,11 +1409,12 @@ record -> {
);

assertThat(records.records("exponential-backoff-delay-retry-topic-200"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand All @@ -1417,11 +1423,12 @@ record -> {
});

assertThat(records.records("exponential-backoff-delay-dlt-topic"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand Down Expand Up @@ -1510,17 +1517,18 @@ void exponentialRandomBackOffRetry() throws Exception {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));

assertThat(records.records("random-exponential-backoff-retry-0"))
.extracting(ConsumerRecord::value)
.satisfiesExactly(
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
+ " name: 'Kermit the Frog'"
+ "}");
},
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'fozzie',"
Expand All @@ -1530,11 +1538,12 @@ record -> {
);

assertThat(records.records("random-exponential-backoff-retry-1"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand All @@ -1543,11 +1552,12 @@ record -> {
});

assertThat(records.records("random-exponential-backoff-retry-2"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand Down Expand Up @@ -1649,17 +1659,18 @@ void uniformRandomBackOffRetry() throws Exception {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));

assertThat(records.records("uniform-random-backoff-retry-0"))
.extracting(ConsumerRecord::value)
.satisfiesExactly(
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
+ " name: 'Kermit the Frog'"
+ "}");
},
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'fozzie',"
Expand All @@ -1669,11 +1680,12 @@ record -> {
);

assertThat(records.records("uniform-random-backoff-retry-1"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand All @@ -1682,11 +1694,12 @@ record -> {
});

assertThat(records.records("uniform-random-backoff-retry-2"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand All @@ -1695,11 +1708,12 @@ record -> {
});

assertThat(records.records("uniform-random-backoff-dlt"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand Down Expand Up @@ -1787,17 +1801,18 @@ void fixedBackOffMultiTopicRetry() throws Exception {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));

assertThat(records.records("fixed-backoff-multi-retry-topic-0"))
.extracting(ConsumerRecord::value)
.satisfiesExactly(
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
+ " name: 'Kermit the Frog'"
+ "}");
},
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'fozzie',"
Expand All @@ -1807,11 +1822,12 @@ record -> {
);

assertThat(records.records("fixed-backoff-multi-retry-topic-1"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand All @@ -1820,11 +1836,12 @@ record -> {
});

assertThat(records.records("fixed-backoff-multi-dlt-topic"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand Down Expand Up @@ -1929,25 +1946,26 @@ void fixedBackOffRetry() throws Exception {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));

assertThat(records.records("fixed-backoff-retry-topic"))
.extracting(ConsumerRecord::value)
.satisfiesExactly(
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
+ " name: 'Kermit the Frog'"
+ "}");
},
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'fozzie',"
+ " name: 'Fozzie the Bear'"
+ "}");
},
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand All @@ -1957,11 +1975,12 @@ record -> {
);

assertThat(records.records("fixed-backoff-dlt-topic"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"retryTopicSuffix": "-retry-topic",
"dltTopicSuffix": "-dlt-topic",
"nonBlockingBackOff": {
"delay": "100"
"delay": "1000"
}

}
Expand Down

0 comments on commit 530b28d

Please sign in to comment.