diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java index 03c8c7f..145249e 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.function.Function; /** @@ -92,8 +93,16 @@ private StrimziKafkaContainer(CompletableFuture imageName) { @Override protected void doStart() { - if (!imageNameProvider.isDone()) { - imageNameProvider.complete(KafkaVersionService.strimziTestContainerImageName(kafkaVersion)); + if (!this.imageNameProvider.isDone()) { + this.imageNameProvider.complete(KafkaVersionService.strimziTestContainerImageName(this.kafkaVersion)); + } + try { + if (this.useKraft && ((this.kafkaVersion != null && this.kafkaVersion.equals("2.8.1")) || this.imageNameProvider.get().contains("2.8.1"))) { + throw new UnsupportedKraftKafkaVersionException("Specified Kafka version " + this.kafkaVersion + " is not supported in KRaft mode."); + } + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Error occurred during retrieving of image name provider", e); + throw new RuntimeException(e); } // we need it for the startZookeeper(); and startKafka(); to run container before... super.setCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT); diff --git a/src/main/java/io/strimzi/test/container/UnsupportedKraftKafkaVersionException.java b/src/main/java/io/strimzi/test/container/UnsupportedKraftKafkaVersionException.java new file mode 100644 index 0000000..72f0a41 --- /dev/null +++ b/src/main/java/io/strimzi/test/container/UnsupportedKraftKafkaVersionException.java @@ -0,0 +1,32 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.test.container; + +/** + * Provides a handler, when user specifies unsupported Kafka version with Kraft mode enabled + * {@link StrimziKafkaContainer#withKraft()}. + */ +public class UnsupportedKraftKafkaVersionException extends RuntimeException { + + /** + * {@link UnsupportedKraftKafkaVersionException} used for handling situation, when + * user specifies unsupported Kafka version with Kraft mode enabled. + * + * @param message specific message to throw + */ + public UnsupportedKraftKafkaVersionException(String message) { + super(message); + } + + /** + * {@link UnsupportedKraftKafkaVersionException} used for handling situation, when + * user specifies unsupported Kafka version with Kraft mode enabled. + * + * @param cause specific cause to throw + */ + public UnsupportedKraftKafkaVersionException(Throwable cause) { + super(cause); + } +} diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java index 25cb304..74e5bf7 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java @@ -14,6 +14,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @@ -32,6 +33,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; @SuppressWarnings("ClassDataAbstractionCoupling") public class StrimziKafkaKraftContainerIT extends AbstractIT { @@ -100,6 +102,31 @@ void testStartContainerWithSomeConfiguration(final String imageName) throws Exec } } + @Test + void testUnsupportedKRaftUsingKafkaVersion() { + assumeDocker(); + + systemUnderTest = new StrimziKafkaContainer() + .withKafkaVersion("2.8.1") + .withBrokerId(1) + .withKraft() + .waitForRunning(); + + assertThrows(UnsupportedKraftKafkaVersionException.class, () -> systemUnderTest.start()); + } + + @Test + void testUnsupportedKRaftUsingImageName() { + assumeDocker(); + + systemUnderTest = new StrimziKafkaContainer("quay.io/strimzi-test-container/test-container:latest-kafka-2.8.1") + .withBrokerId(1) + .withKraft() + .waitForRunning(); + + assertThrows(UnsupportedKraftKafkaVersionException.class, () -> systemUnderTest.start()); + } + private void verify() throws InterruptedException, ExecutionException, TimeoutException { final String topicName = "topic"; @@ -129,6 +156,5 @@ private void verify() throws InterruptedException, ExecutionException, TimeoutEx assertThat(records.records(topic).get(1).value(), equalTo("2")); assertThat(records.records(topic).get(2).value(), equalTo("3")); } - } }