diff --git a/modules/flowable-event-registry-spring/src/main/java/org/flowable/eventregistry/spring/kafka/KafkaChannelDefinitionProcessor.java b/modules/flowable-event-registry-spring/src/main/java/org/flowable/eventregistry/spring/kafka/KafkaChannelDefinitionProcessor.java index 1f34580a78f..450ceab01f7 100644 --- a/modules/flowable-event-registry-spring/src/main/java/org/flowable/eventregistry/spring/kafka/KafkaChannelDefinitionProcessor.java +++ b/modules/flowable-event-registry-spring/src/main/java/org/flowable/eventregistry/spring/kafka/KafkaChannelDefinitionProcessor.java @@ -56,6 +56,8 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationListener; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.event.ContextClosedEvent; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.expression.StandardBeanExpressionResolver; import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor; @@ -116,7 +118,7 @@ * * @author Filip Hrisafov */ -public class KafkaChannelDefinitionProcessor implements BeanFactoryAware, ApplicationContextAware, ApplicationListener, DisposableBean, ChannelModelProcessor { +public class KafkaChannelDefinitionProcessor implements BeanFactoryAware, ApplicationContextAware, ApplicationListener, ChannelModelProcessor { public static final String CHANNEL_ID_PREFIX = "org.flowable.eventregistry.kafka.ChannelKafkaListenerEndpointContainer#"; @@ -132,8 +134,7 @@ public class KafkaChannelDefinitionProcessor implements BeanFactoryAware, Applic protected String containerFactoryBeanName = KafkaListenerAnnotationBeanPostProcessor.DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME; protected KafkaListenerContainerFactory containerFactory; - protected TaskScheduler retryTopicTaskScheduler; - protected ThreadPoolTaskScheduler retryTopicThreadPoolTaskScheduler; + protected KafkaConsumerBackoffManager kafkaConsumerBackoffManager; protected BeanFactory beanFactory; protected ApplicationContext applicationContext; @@ -366,10 +367,7 @@ protected ListenerContainerFactoryConfigurer createListenerContainerFactoryConfi DefaultDestinationTopicResolver topicResolver) { DeadLetterPublishingRecovererFactory recovererFactory = new DeadLetterPublishingRecovererFactory(topicResolver); - ContainerPartitionPausingBackOffManagerFactory managerFactory = new ContainerPartitionPausingBackOffManagerFactory(endpointRegistry, applicationContext); - configurePartitionPausingFactory(managerFactory); - - KafkaConsumerBackoffManager manager = managerFactory.create(); + KafkaConsumerBackoffManager manager = getOrCreateKafkaConsumerBackoffManager(); ListenerContainerFactoryConfigurer factoryConfigurer = new ListenerContainerFactoryConfigurer(manager, recovererFactory, Clock.systemUTC()); if (retryConfiguration.hasNoRetryTopic()) { // If we do not have a retry topic, then the retries have to be blocking @@ -398,19 +396,24 @@ protected ListenerContainerFactoryConfigurer createListenerContainerFactoryConfi return factoryConfigurer; } - protected void configurePartitionPausingFactory(ContainerPartitionPausingBackOffManagerFactory factory) { - TaskScheduler scheduler = getOrCreateRetryTopicTaskScheduler(); - Assert.notNull(scheduler, "Either a RetryTopicSchedulerWrapper or TaskScheduler bean is required"); - factory.setBackOffHandler(new ContainerPausingBackOffHandler( + protected KafkaConsumerBackoffManager getOrCreateKafkaConsumerBackoffManager() { + if (this.kafkaConsumerBackoffManager != null) { + return this.kafkaConsumerBackoffManager; + } + + ContainerPartitionPausingBackOffManagerFactory containerBackOffManagerFactory = + new ContainerPartitionPausingBackOffManagerFactory(endpointRegistry, applicationContext); + containerBackOffManagerFactory.setBackOffHandler(new ContainerPausingBackOffHandler( new ListenerContainerPauseService(endpointRegistry, getOrCreateRetryTopicTaskScheduler()))); + this.kafkaConsumerBackoffManager = containerBackOffManagerFactory.create(); + return this.kafkaConsumerBackoffManager; + } protected TaskScheduler getOrCreateRetryTopicTaskScheduler() { - if (retryTopicTaskScheduler != null) { - return retryTopicTaskScheduler; - } ObjectProvider retryTopicSchedulerWrapperProvider = applicationContext.getBeanProvider(RetryTopicSchedulerWrapper.class); RetryTopicSchedulerWrapper schedulerWrapper = retryTopicSchedulerWrapperProvider.getIfAvailable(); + TaskScheduler retryTopicTaskScheduler; if (schedulerWrapper != null) { retryTopicTaskScheduler = schedulerWrapper.getScheduler(); } else { @@ -418,9 +421,12 @@ protected TaskScheduler getOrCreateRetryTopicTaskScheduler() { } if (retryTopicTaskScheduler == null) { - retryTopicThreadPoolTaskScheduler = new ThreadPoolTaskScheduler(); - retryTopicThreadPoolTaskScheduler.afterPropertiesSet(); - retryTopicTaskScheduler = retryTopicThreadPoolTaskScheduler; + ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); + threadPoolTaskScheduler.setThreadNamePrefix("flowable-kafka-retry-scheduling-"); + threadPoolTaskScheduler.afterPropertiesSet(); + ((ConfigurableApplicationContext) applicationContext).addApplicationListener( + (ApplicationListener) event -> threadPoolTaskScheduler.destroy()); + retryTopicTaskScheduler = threadPoolTaskScheduler; } return retryTopicTaskScheduler; @@ -894,13 +900,6 @@ public void onApplicationEvent(ContextRefreshedEvent event) { } } - @Override - public void destroy() throws Exception { - if (retryTopicThreadPoolTaskScheduler != null) { - retryTopicThreadPoolTaskScheduler.destroy(); - } - } - public KafkaOperations getKafkaOperations() { return kafkaOperations; } @@ -941,12 +940,12 @@ public void setContainerFactory(KafkaListenerContainerFactory containerFactor this.containerFactory = containerFactory; } - public TaskScheduler getRetryTopicTaskScheduler() { - return retryTopicTaskScheduler; + public KafkaConsumerBackoffManager getKafkaConsumerBackoffManager() { + return kafkaConsumerBackoffManager; } - public void setRetryTopicTaskScheduler(TaskScheduler retryTopicTaskScheduler) { - this.retryTopicTaskScheduler = retryTopicTaskScheduler; + public void setKafkaConsumerBackoffManager(KafkaConsumerBackoffManager kafkaConsumerBackoffManager) { + this.kafkaConsumerBackoffManager = kafkaConsumerBackoffManager; } protected static class RetryTopicContainerFactoryDecorator implements KafkaListenerContainerFactory { diff --git a/modules/flowable-spring-boot/flowable-spring-boot-starters/flowable-spring-boot-autoconfigure/src/main/java/org/flowable/spring/boot/eventregistry/EventRegistryAutoConfiguration.java b/modules/flowable-spring-boot/flowable-spring-boot-starters/flowable-spring-boot-autoconfigure/src/main/java/org/flowable/spring/boot/eventregistry/EventRegistryAutoConfiguration.java index ba84415bfd4..1b42391c379 100644 --- a/modules/flowable-spring-boot/flowable-spring-boot-starters/flowable-spring-boot-autoconfigure/src/main/java/org/flowable/spring/boot/eventregistry/EventRegistryAutoConfiguration.java +++ b/modules/flowable-spring-boot/flowable-spring-boot-starters/flowable-spring-boot-autoconfigure/src/main/java/org/flowable/spring/boot/eventregistry/EventRegistryAutoConfiguration.java @@ -66,6 +66,7 @@ import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.KafkaAdminOperations; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.listener.KafkaConsumerBackoffManager; import org.springframework.scheduling.TaskScheduler; import org.springframework.transaction.PlatformTransactionManager; @@ -257,11 +258,13 @@ public static class EventRegistryKafkaConfiguration { @ConditionalOnMissingBean(name = "kafkaChannelDefinitionProcessor") public KafkaChannelDefinitionProcessor kafkaChannelDefinitionProcessor(KafkaListenerEndpointRegistry endpointRegistry, KafkaOperations kafkaOperations, ObjectMapper objectMapper, + ObjectProvider kafkaConsumerBackoffManager, ObjectProvider kafkaAdminOperations) { KafkaChannelDefinitionProcessor kafkaChannelDefinitionProcessor = new KafkaChannelDefinitionProcessor(objectMapper); kafkaChannelDefinitionProcessor.setEndpointRegistry(endpointRegistry); kafkaChannelDefinitionProcessor.setKafkaOperations(kafkaOperations); + kafkaChannelDefinitionProcessor.setKafkaConsumerBackoffManager(kafkaConsumerBackoffManager.getIfAvailable()); kafkaChannelDefinitionProcessor.setKafkaAdminOperations(kafkaAdminOperations.getIfAvailable()); return kafkaChannelDefinitionProcessor;