Skip to content

Commit

Permalink
[MO] - restrict KRaft for Kafka 2.8.1 (#25)
Browse files Browse the repository at this point in the history
* [MO] - restrict KRaft for Kafka 2.8.1

Signed-off-by: morsak <[email protected]>

* boxed exception

Signed-off-by: morsak <[email protected]>
  • Loading branch information
see-quick authored and morsak committed Jan 26, 2022
1 parent 4d06ee9 commit 401eeec
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 3 deletions.
13 changes: 11 additions & 2 deletions src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -92,8 +93,16 @@ private StrimziKafkaContainer(CompletableFuture<String> imageName) {

@Override
protected void doStart() {
if (!imageNameProvider.isDone()) {
imageNameProvider.complete(KafkaVersionService.strimziTestContainerImageName(kafkaVersion));
if (!this.imageNameProvider.isDone()) {
this.imageNameProvider.complete(KafkaVersionService.strimziTestContainerImageName(this.kafkaVersion));
}
try {
if (this.useKraft && ((this.kafkaVersion != null && this.kafkaVersion.equals("2.8.1")) || this.imageNameProvider.get().contains("2.8.1"))) {
throw new UnsupportedKraftKafkaVersionException("Specified Kafka version " + this.kafkaVersion + " is not supported in KRaft mode.");
}
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Error occurred during retrieving of image name provider", e);
throw new RuntimeException(e);
}
// we need it for the startZookeeper(); and startKafka(); to run container before...
super.setCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.test.container;

/**
* Provides a handler, when user specifies unsupported Kafka version with Kraft mode enabled
* {@link StrimziKafkaContainer#withKraft()}.
*/
public class UnsupportedKraftKafkaVersionException extends RuntimeException {

/**
* {@link UnsupportedKraftKafkaVersionException} used for handling situation, when
* user specifies unsupported Kafka version with Kraft mode enabled.
*
* @param message specific message to throw
*/
public UnsupportedKraftKafkaVersionException(String message) {
super(message);
}

/**
* {@link UnsupportedKraftKafkaVersionException} used for handling situation, when
* user specifies unsupported Kafka version with Kraft mode enabled.
*
* @param cause specific cause to throw
*/
public UnsupportedKraftKafkaVersionException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
Expand All @@ -32,6 +33,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

@SuppressWarnings("ClassDataAbstractionCoupling")
public class StrimziKafkaKraftContainerIT extends AbstractIT {
Expand Down Expand Up @@ -100,6 +102,31 @@ void testStartContainerWithSomeConfiguration(final String imageName) throws Exec
}
}

@Test
void testUnsupportedKRaftUsingKafkaVersion() {
assumeDocker();

systemUnderTest = new StrimziKafkaContainer()
.withKafkaVersion("2.8.1")
.withBrokerId(1)
.withKraft()
.waitForRunning();

assertThrows(UnsupportedKraftKafkaVersionException.class, () -> systemUnderTest.start());
}

@Test
void testUnsupportedKRaftUsingImageName() {
assumeDocker();

systemUnderTest = new StrimziKafkaContainer("quay.io/strimzi-test-container/test-container:latest-kafka-2.8.1")
.withBrokerId(1)
.withKraft()
.waitForRunning();

assertThrows(UnsupportedKraftKafkaVersionException.class, () -> systemUnderTest.start());
}

private void verify() throws InterruptedException, ExecutionException, TimeoutException {
final String topicName = "topic";

Expand Down Expand Up @@ -129,6 +156,5 @@ private void verify() throws InterruptedException, ExecutionException, TimeoutEx
assertThat(records.records(topic).get(1).value(), equalTo("2"));
assertThat(records.records(topic).get(2).value(), equalTo("3"));
}

}
}

0 comments on commit 401eeec

Please sign in to comment.