Skip to content

Commit

Permalink
Use a singleton KafkaConsumerBackoffManager in the KafkaChannelDefini…
Browse files Browse the repository at this point in the history
…tionProcessor and reuse one from the application context
  • Loading branch information
filiphr committed Jun 27, 2023
1 parent 45f9678 commit 835d1e5
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
Expand Down Expand Up @@ -116,7 +118,7 @@
*
* @author Filip Hrisafov
*/
public class KafkaChannelDefinitionProcessor implements BeanFactoryAware, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, DisposableBean, ChannelModelProcessor {
public class KafkaChannelDefinitionProcessor implements BeanFactoryAware, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, ChannelModelProcessor {

public static final String CHANNEL_ID_PREFIX = "org.flowable.eventregistry.kafka.ChannelKafkaListenerEndpointContainer#";

Expand All @@ -132,8 +134,7 @@ public class KafkaChannelDefinitionProcessor implements BeanFactoryAware, Applic
protected String containerFactoryBeanName = KafkaListenerAnnotationBeanPostProcessor.DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;

protected KafkaListenerContainerFactory<?> containerFactory;
protected TaskScheduler retryTopicTaskScheduler;
protected ThreadPoolTaskScheduler retryTopicThreadPoolTaskScheduler;
protected KafkaConsumerBackoffManager kafkaConsumerBackoffManager;

protected BeanFactory beanFactory;
protected ApplicationContext applicationContext;
Expand Down Expand Up @@ -366,10 +367,7 @@ protected ListenerContainerFactoryConfigurer createListenerContainerFactoryConfi
DefaultDestinationTopicResolver topicResolver) {
DeadLetterPublishingRecovererFactory recovererFactory = new DeadLetterPublishingRecovererFactory(topicResolver);

ContainerPartitionPausingBackOffManagerFactory managerFactory = new ContainerPartitionPausingBackOffManagerFactory(endpointRegistry, applicationContext);
configurePartitionPausingFactory(managerFactory);

KafkaConsumerBackoffManager manager = managerFactory.create();
KafkaConsumerBackoffManager manager = getOrCreateKafkaConsumerBackoffManager();
ListenerContainerFactoryConfigurer factoryConfigurer = new ListenerContainerFactoryConfigurer(manager, recovererFactory, Clock.systemUTC());
if (retryConfiguration.hasNoRetryTopic()) {
// If we do not have a retry topic, then the retries have to be blocking
Expand Down Expand Up @@ -398,29 +396,37 @@ protected ListenerContainerFactoryConfigurer createListenerContainerFactoryConfi
return factoryConfigurer;
}

protected void configurePartitionPausingFactory(ContainerPartitionPausingBackOffManagerFactory factory) {
TaskScheduler scheduler = getOrCreateRetryTopicTaskScheduler();
Assert.notNull(scheduler, "Either a RetryTopicSchedulerWrapper or TaskScheduler bean is required");
factory.setBackOffHandler(new ContainerPausingBackOffHandler(
protected KafkaConsumerBackoffManager getOrCreateKafkaConsumerBackoffManager() {
if (this.kafkaConsumerBackoffManager != null) {
return this.kafkaConsumerBackoffManager;
}

ContainerPartitionPausingBackOffManagerFactory containerBackOffManagerFactory =
new ContainerPartitionPausingBackOffManagerFactory(endpointRegistry, applicationContext);
containerBackOffManagerFactory.setBackOffHandler(new ContainerPausingBackOffHandler(
new ListenerContainerPauseService(endpointRegistry, getOrCreateRetryTopicTaskScheduler())));
this.kafkaConsumerBackoffManager = containerBackOffManagerFactory.create();
return this.kafkaConsumerBackoffManager;

}

protected TaskScheduler getOrCreateRetryTopicTaskScheduler() {
if (retryTopicTaskScheduler != null) {
return retryTopicTaskScheduler;
}
ObjectProvider<RetryTopicSchedulerWrapper> retryTopicSchedulerWrapperProvider = applicationContext.getBeanProvider(RetryTopicSchedulerWrapper.class);
RetryTopicSchedulerWrapper schedulerWrapper = retryTopicSchedulerWrapperProvider.getIfAvailable();
TaskScheduler retryTopicTaskScheduler;
if (schedulerWrapper != null) {
retryTopicTaskScheduler = schedulerWrapper.getScheduler();
} else {
retryTopicTaskScheduler = applicationContext.getBeanProvider(TaskScheduler.class).getIfAvailable();
}

if (retryTopicTaskScheduler == null) {
retryTopicThreadPoolTaskScheduler = new ThreadPoolTaskScheduler();
retryTopicThreadPoolTaskScheduler.afterPropertiesSet();
retryTopicTaskScheduler = retryTopicThreadPoolTaskScheduler;
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setThreadNamePrefix("flowable-kafka-retry-scheduling-");
threadPoolTaskScheduler.afterPropertiesSet();
((ConfigurableApplicationContext) applicationContext).addApplicationListener(
(ApplicationListener<ContextClosedEvent>) event -> threadPoolTaskScheduler.destroy());
retryTopicTaskScheduler = threadPoolTaskScheduler;
}

return retryTopicTaskScheduler;
Expand Down Expand Up @@ -894,13 +900,6 @@ public void onApplicationEvent(ContextRefreshedEvent event) {
}
}

@Override
public void destroy() throws Exception {
if (retryTopicThreadPoolTaskScheduler != null) {
retryTopicThreadPoolTaskScheduler.destroy();
}
}

public KafkaOperations<Object, Object> getKafkaOperations() {
return kafkaOperations;
}
Expand Down Expand Up @@ -941,12 +940,12 @@ public void setContainerFactory(KafkaListenerContainerFactory<?> containerFactor
this.containerFactory = containerFactory;
}

public TaskScheduler getRetryTopicTaskScheduler() {
return retryTopicTaskScheduler;
public KafkaConsumerBackoffManager getKafkaConsumerBackoffManager() {
return kafkaConsumerBackoffManager;
}

public void setRetryTopicTaskScheduler(TaskScheduler retryTopicTaskScheduler) {
this.retryTopicTaskScheduler = retryTopicTaskScheduler;
public void setKafkaConsumerBackoffManager(KafkaConsumerBackoffManager kafkaConsumerBackoffManager) {
this.kafkaConsumerBackoffManager = kafkaConsumerBackoffManager;
}

protected static class RetryTopicContainerFactoryDecorator implements KafkaListenerContainerFactory<MessageListenerContainer> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaAdminOperations;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;

Expand Down Expand Up @@ -257,11 +258,13 @@ public static class EventRegistryKafkaConfiguration {
@ConditionalOnMissingBean(name = "kafkaChannelDefinitionProcessor")
public KafkaChannelDefinitionProcessor kafkaChannelDefinitionProcessor(KafkaListenerEndpointRegistry endpointRegistry,
KafkaOperations<Object, Object> kafkaOperations, ObjectMapper objectMapper,
ObjectProvider<KafkaConsumerBackoffManager> kafkaConsumerBackoffManager,
ObjectProvider<KafkaAdminOperations> kafkaAdminOperations) {

KafkaChannelDefinitionProcessor kafkaChannelDefinitionProcessor = new KafkaChannelDefinitionProcessor(objectMapper);
kafkaChannelDefinitionProcessor.setEndpointRegistry(endpointRegistry);
kafkaChannelDefinitionProcessor.setKafkaOperations(kafkaOperations);
kafkaChannelDefinitionProcessor.setKafkaConsumerBackoffManager(kafkaConsumerBackoffManager.getIfAvailable());
kafkaChannelDefinitionProcessor.setKafkaAdminOperations(kafkaAdminOperations.getIfAvailable());

return kafkaChannelDefinitionProcessor;
Expand Down

0 comments on commit 835d1e5

Please sign in to comment.