Skip to content

Commit

Permalink
Fix Reactive Producer Tests (Fencing)
Browse files Browse the repository at this point in the history
Use a different `transactional.id` for each test.
  • Loading branch information
garyrussell committed Jul 17, 2023
1 parent 86a91b1 commit 3c55ea2
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

Expand Down Expand Up @@ -104,17 +105,18 @@ public static void setUpBeforeClass() {
}

@BeforeEach
public void setUp() {
reactiveKafkaProducerTemplate = new ReactiveKafkaProducerTemplate<>(setupSenderOptionsWithDefaultTopic(),
public void setUp(TestInfo info) {
reactiveKafkaProducerTemplate = new ReactiveKafkaProducerTemplate<>(setupSenderOptionsWithDefaultTopic(info),
new MessagingMessageConverter());
}

private SenderOptions<Integer, String> setupSenderOptionsWithDefaultTopic() {
private SenderOptions<Integer, String> setupSenderOptionsWithDefaultTopic(TestInfo info) {
Map<String, Object> senderProps =
KafkaTestUtils.producerProps(EmbeddedKafkaCondition.getBroker().getBrokersAsString());
SenderOptions<Integer, String> senderOptions = SenderOptions.create(senderProps);
senderOptions = senderOptions
.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "reactive.transaction")
.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"reactive.transaction." + info.getDisplayName().replaceAll("\\(\\)", ""))
.producerProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return senderOptions;
}
Expand Down Expand Up @@ -279,7 +281,9 @@ public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiv
.abort()
.then(Mono.error(error))))
.expectErrorMatches(throwable -> throwable instanceof IllegalStateException &&
throwable.getMessage().equals("TransactionalId reactive.transaction: Invalid transition " +
throwable.getMessage().equals("TransactionalId reactive.transaction."
+ "shouldSendOneRecordTransactionallyViaTemplateAsSenderRecord"
+ "AndReceiveItExactlyOnceWithException: Invalid transition " +
"attempted from state READY to state ABORTING_TRANSACTION"))
.verify(DEFAULT_VERIFY_TIMEOUT);

Expand Down

0 comments on commit 3c55ea2

Please sign in to comment.