Skip to content

Commit

Permalink
Stabilise Kafka Channel Definition Process tests
Browse files Browse the repository at this point in the history
* Use a larger non blocking delay
* Assert on the consumer record values directly (the error message is slightly better)
  • Loading branch information
filiphr committed Jun 28, 2023
1 parent f21e9bc commit fc35ac2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -60,8 +61,6 @@
import org.flowable.eventregistry.impl.runtime.EventPayloadInstanceImpl;
import org.flowable.eventregistry.model.ChannelModel;
import org.flowable.eventregistry.model.EventPayload;
import org.flowable.eventregistry.spring.kafka.KafkaMessageKeyProvider;
import org.flowable.eventregistry.spring.kafka.KafkaPartitionProvider;
import org.flowable.eventregistry.spring.test.TestEventConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -1259,17 +1258,18 @@ void exponentialBackOffRetry() throws Exception {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));

assertThat(records.records("exponential-backoff-retry-topic-0"))
.extracting(ConsumerRecord::value)
.satisfiesExactly(
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
+ " name: 'Kermit the Frog'"
+ "}");
},
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'fozzie',"
Expand All @@ -1279,11 +1279,12 @@ record -> {
);

assertThat(records.records("exponential-backoff-retry-topic-1"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand All @@ -1292,11 +1293,12 @@ record -> {
});

assertThat(records.records("exponential-backoff-dlt-topic"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand Down Expand Up @@ -1386,17 +1388,18 @@ void exponentialBackOffRetryWithDelaySuffixing() throws Exception {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));

assertThat(records.records("exponential-backoff-delay-retry-topic-100"))
.extracting(ConsumerRecord::value)
.satisfiesExactly(
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
+ " name: 'Kermit the Frog'"
+ "}");
},
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'fozzie',"
Expand All @@ -1406,11 +1409,12 @@ record -> {
);

assertThat(records.records("exponential-backoff-delay-retry-topic-200"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand All @@ -1419,11 +1423,12 @@ record -> {
});

assertThat(records.records("exponential-backoff-delay-dlt-topic"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand Down Expand Up @@ -1512,17 +1517,18 @@ void exponentialRandomBackOffRetry() throws Exception {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));

assertThat(records.records("random-exponential-backoff-retry-0"))
.extracting(ConsumerRecord::value)
.satisfiesExactly(
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
+ " name: 'Kermit the Frog'"
+ "}");
},
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'fozzie',"
Expand All @@ -1532,11 +1538,12 @@ record -> {
);

assertThat(records.records("random-exponential-backoff-retry-1"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand All @@ -1545,11 +1552,12 @@ record -> {
});

assertThat(records.records("random-exponential-backoff-retry-2"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand Down Expand Up @@ -1651,17 +1659,18 @@ void uniformRandomBackOffRetry() throws Exception {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));

assertThat(records.records("uniform-random-backoff-retry-0"))
.extracting(ConsumerRecord::value)
.satisfiesExactly(
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
+ " name: 'Kermit the Frog'"
+ "}");
},
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'fozzie',"
Expand All @@ -1671,11 +1680,12 @@ record -> {
);

assertThat(records.records("uniform-random-backoff-retry-1"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand All @@ -1684,11 +1694,12 @@ record -> {
});

assertThat(records.records("uniform-random-backoff-retry-2"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand All @@ -1697,11 +1708,12 @@ record -> {
});

assertThat(records.records("uniform-random-backoff-dlt"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand Down Expand Up @@ -1789,17 +1801,18 @@ void fixedBackOffMultiTopicRetry() throws Exception {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));

assertThat(records.records("fixed-backoff-multi-retry-topic-0"))
.extracting(ConsumerRecord::value)
.satisfiesExactly(
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
+ " name: 'Kermit the Frog'"
+ "}");
},
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'fozzie',"
Expand All @@ -1809,11 +1822,12 @@ record -> {
);

assertThat(records.records("fixed-backoff-multi-retry-topic-1"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand All @@ -1822,11 +1836,12 @@ record -> {
});

assertThat(records.records("fixed-backoff-multi-dlt-topic"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand Down Expand Up @@ -1931,25 +1946,26 @@ void fixedBackOffRetry() throws Exception {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(2));

assertThat(records.records("fixed-backoff-retry-topic"))
.extracting(ConsumerRecord::value)
.satisfiesExactly(
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
+ " name: 'Kermit the Frog'"
+ "}");
},
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'fozzie',"
+ " name: 'Fozzie the Bear'"
+ "}");
},
record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand All @@ -1959,11 +1975,12 @@ record -> {
);

assertThat(records.records("fixed-backoff-dlt-topic"))
.extracting(ConsumerRecord::value)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> {
assertThatJson(record.value())
assertThatJson(record)
.isEqualTo("{"
+ " eventKey: 'test',"
+ " customer: 'kermit',"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"retryTopicSuffix": "-retry-topic",
"dltTopicSuffix": "-dlt-topic",
"nonBlockingBackOff": {
"delay": "100"
"delay": "1000"
}

}
Expand Down

0 comments on commit fc35ac2

Please sign in to comment.