diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 8667f34c5..8ef258155 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -124,6 +124,8 @@ public abstract class AbstractMessageListenerContainer private volatile boolean running = false; + private volatile boolean fenced = false; + private volatile boolean paused; private volatile boolean stoppedNormally = true; @@ -275,6 +277,10 @@ public boolean isRunning() { return this.running; } + protected void setFenced(boolean fenced) { + this.fenced = fenced; + } + @Deprecated(since = "3.2", forRemoval = true) protected boolean isPaused() { return this.paused; @@ -509,6 +515,7 @@ public final void start() { if (!isRunning()) { Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener, () -> "A " + GenericMessageListener.class.getName() + " implementation must be provided"); + Assert.state(!this.fenced, "Container Fenced. It is not allowed to start."); doStart(); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index b53909ff1..954ba093e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -355,6 +355,7 @@ protected void doStop(final Runnable callback, boolean normal) { } } for (KafkaMessageListenerContainer container : this.containers) { + container.setFenced(true); if (container.isRunning()) { if (normal) { container.stop(() -> { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index 9a9969d68..106d67a88 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -17,6 +17,7 @@ package org.springframework.kafka.listener; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.BDDMockito.given; @@ -207,6 +208,7 @@ protected Consumer createKafkaConsumer(String groupId, String c assertThat(container.metrics()).isNotNull(); Set> children = new HashSet<>(containers); assertThat(container.isInExpectedState()).isTrue(); + MessageListenerContainer childContainer = container.getContainers().get(0); container.getContainers().get(0).stopAbnormally(() -> { }); assertThat(container.isInExpectedState()).isFalse(); container.getContainers().get(0).start(); @@ -236,6 +238,10 @@ else if (e instanceof ConcurrentContainerStoppedEvent concurrentContainerStopped }); assertThat(overrides.get().getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)).isNull(); this.logger.info("Stop auto"); + assertThat(childContainer.isRunning()).isFalse(); + assertThat(container.isRunning()).isFalse(); + // Fenced container. Throws exception + assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> childContainer.start()); } @Test