Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support kafka parallel-consumer #2381

bojanv55 opened this issue Aug 24, 2022 · 21 comments

Support kafka parallel-consumer #2381

bojanv55 opened this issue Aug 24, 2022 · 21 comments


Copy link

Expected Behavior

Enable parallel processing of the messages in single consumer.

Current Behavior

Single-threaded processing of the messages.


Are there any plans to include ParallelStreamProcessor as an option for spring-kafka? It should handle automatically multiple threads, acking and other stuff (synchronization on same key if needed etc.).

Currently I can configure this manually, but would be easier I guess If I would only need to implement void poll(Consumer<PollContext<K, V>> usersVoidConsumptionFunction);, and the rest is taken care by the spring.

Copy link

No plans currently; but contributions are welcome!

Copy link

That said; it's easy enough to use it. from Spring - just set up a @KafkaListener as normal; set the auto startup to "false" and then invoke the listener from the parallel consumer:

public class Kgh2381Application {

	private static final Logger log = LoggerFactory.getLogger(Kgh2381Application.class);

	public static void main(String[] args) {, args);

	@KafkaListener(id = "kgh2381", topics = "kgh2381", autoStartup = "false")
	void listen(String in) {;

	public NewTopic topic() {

	ApplicationRunner runner(KafkaListenerEndpointRegistry registry, ConsumerFactory<String, String> cf,
			KafkaTemplate<String, String> template) {

		return args -> {
			MessageListener messageListener = (MessageListener) registry.getListenerContainer("kgh2381")
			Consumer<String, String> consumer = cf.createConsumer("group", "");
			var options = ParallelConsumerOptions.<String, String>builder()
			ParallelStreamProcessor<String, String> processor = ParallelStreamProcessor
			processor.poll(context -> messageListener.onMessage(context.getSingleConsumerRecord(), null, consumer));
			IntStream.range(0, 10).forEach(i -> template.send("kgh2381", "foo" + i));


Copy link

dixitsingla commented Jun 29, 2023

I am trying to implement the parallel consumer in spring boot and using the above mentioned way. I have main class in separate java file and rest of the methods(KafkaListener, ApplicationRunner) in another java file.

Also, have created a ConsumerProp class with the below content:

public class ConsumerProp {

    public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");

        return new DefaultKafkaConsumerFactory<>(configs);

But the spring boot just exists without even any error. Could you please help or put a sample working example which I can leverage for further use.

Copy link

I assume you mean exits (not exists).

You must have done something wrong; the example above is a complete working example using Spring Boot. This is the output when running it...

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
�[32m :: Spring Boot :: �[39m              �[2m (v2.7.3)�[0;39m

2023-06-29 14:28:45,673 [main] Starting Kgh2381Application using Java 11.0.11 on with PID 83586 (/Users/grussell/Development/stsws43/kgh2381/target/classes started by grussell in /Users/grussell/Development/stsws43/kgh2381)
2023-06-29 14:28:45,675 [main] No active profile set, falling back to 1 default profile: "default"
2023-06-29 14:28:46,176 [main] AdminClientConfig values: 
2023-06-29 14:28:51,533 [pc-broker-poll] pl.tlinkowski.unij.service.api.collect.UnmodifiableMapFactory service: selected pl.tlinkowski.unij.service.collect.jdk8.Jdk8UnmodifiableMapFactory (priority=40)
2023-06-29 14:28:53,520 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Successfully joined group with generation Generation{generationId=4, memberId='consumer-group-1-8ebc709a-c397-4585-89d2-51fc00808899', protocol='range'}
2023-06-29 14:28:53,522 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Finished assignment for group at generation 4: {consumer-group-1-8ebc709a-c397-4585-89d2-51fc00808899=Assignment(partitions=[kgh2381-0])}
2023-06-29 14:28:53,527 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Successfully synced group in generation Generation{generationId=4, memberId='consumer-group-1-8ebc709a-c397-4585-89d2-51fc00808899', protocol='range'}
2023-06-29 14:28:53,527 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Notifying assignor about the new Assignment(partitions=[kgh2381-0])
2023-06-29 14:28:53,529 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Adding newly assigned partitions: kgh2381-0
2023-06-29 14:28:53,529 [pc-broker-poll] Assigned 1 total (1 new) partition(s) [kgh2381-0]
2023-06-29 14:28:53,540 [pc-broker-poll] [Consumer clientId=consumer-group-1, groupId=group] Setting offset for partition kgh2381-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2023-06-29 14:28:53,559 [pc-pool-1-thread-1] foo0
2023-06-29 14:28:53,560 [pc-pool-1-thread-1] pl.tlinkowski.unij.service.api.collect.UnmodifiableListFactory service: selected pl.tlinkowski.unij.service.collect.jdk8.Jdk8UnmodifiableListFactory (priority=40)
2023-06-29 14:28:53,562 [pc-pool-1-thread-2] foo1
2023-06-29 14:28:53,564 [pc-pool-1-thread-3] foo2
2023-06-29 14:28:53,566 [pc-pool-1-thread-4] foo3
2023-06-29 14:28:53,567 [pc-pool-1-thread-5] foo4
2023-06-29 14:28:53,569 [pc-pool-1-thread-6] foo5
2023-06-29 14:28:53,571 [pc-pool-1-thread-7] foo6
2023-06-29 14:28:53,573 [pc-pool-1-thread-8] foo7
2023-06-29 14:28:53,574 [pc-pool-1-thread-9] foo8
2023-06-29 14:28:53,576 [pc-pool-1-thread-10] foo9

Copy link

This is its

... rest of the methods(KafkaListener, ApplicationRunner) in another java file.

That file must be annotated with @Configuration so that the beans are registered.

You don't need your own consumer factory bean, just use Boot's.

Copy link

dixitsingla commented Jun 30, 2023

Thank for the correcting the type(exits).

I was able to run the application by specifying @Autowired annotation for the ApplicationRunner runner( method. Although I have tried @Configuration annotation as well and it's also working.

I am using consumer factory bean as I need to provide few properties dynamically(some cases from environment variables). Like Bootstrap server etc..

If that's okay with you I really would like you to share the sample project you just ran above.

Thanks again for all the help.

Copy link

If that's okay with you I really would like you to share the sample project you just ran above.

There's nothing more to share; that is the entire app (together with the properties above).

Copy link

As you stated earlier 'You don't need your own consumer factory bean, just use Boot's.'

I have just observed that the runner method is using the spring boot's consumer factory only, the one I have created is not being used. Could you please help me what should I do use the consumer factory I have created.

Copy link

Since you define your own consumer factory bean, Boot will detect it and not declare its own; yours will be injected into the runner instead.

Copy link

garyrussell commented Jul 3, 2023


public DefaultKafkaConsumerFactory<?, ?> kafkaConsumerFactory(

Your @Configuration class must be in the same (or child) package as the @SpringBootApplication.

Copy link

dixitsingla commented Jul 3, 2023

As of now I am trying with the spring boot's consumer factory.

My application is stuck at
2023-07-03 22:19:38.423 INFO 40836 --- [ pc-broker-poll] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-group-1, groupId=group] Resetting offset for partition topic_1-0 to position FetchPosition{offset=6, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ (id: 1 rack: null)], epoch=0}}.
I don't see any consumed messages. there are 6 messages in the topic. Seems like the listen method is not called at all.

@KafkaListener(id = "group_1", topics = "topic_1", autoStartup = "false")
    void listen(String record) {;

I am not very good at spring boot yet, Sorry for asking the small doubts.

Copy link

autoStartup = "false"

You have to start the container to consume.

Copy link

dixitsingla commented Jul 3, 2023

Sorry, did not get what do you mean by container.

I am running the application in Intellij idea and the Kafka is up and running.

Copy link

Do you mean by setting the autoStartup='true' - But in that case you have said earlier that auto setup should be set to false to invoke the parallel consumer.

Copy link

Could you please help me in understanding why listen method is not called, what needs to be done to make it working.

Ideally it should call the listen method as soon as there is a message to consume as defined below.
processor.poll(context -> messageListener.onMessage(context.getSingleConsumerRecord(), null, consumer));

Copy link

dixitsingla commented Jul 3, 2023

You have to start the container to consume.

After adding the below line I was able to consume the messages. but each message is being consumed 2 times,, Could you please help me with that.

processor.poll(context -> messageListener.onMessage(context.getSingleConsumerRecord(), null, consumer));

Could you please help me with what/where is the right way to start the container.


2023-07-03 23:47:42.584  INFO 1724 --- [  group_1-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group_1-2, groupId=group_1] Cluster ID: bfy4mk8aSlW6HhYc1lbXeQ
2023-07-03 23:47:42.584  INFO 1724 --- [ pc-broker-poll] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group-1, groupId=group] Cluster ID: bfy4mk8aSlW6HhYc1lbXeQ
2023-07-03 23:47:42.586  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Discovered group coordinator (id: 2147483646 rack: null)
2023-07-03 23:47:42.586  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Discovered group coordinator (id: 2147483646 rack: null)
2023-07-03 23:47:42.590  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] (Re-)joining group
2023-07-03 23:47:42.590  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] (Re-)joining group
2023-07-03 23:47:42.606  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Request joining group due to: need to re-join with the given member-id
2023-07-03 23:47:42.606  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Request joining group due to: need to re-join with the given member-id
2023-07-03 23:47:42.606  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] (Re-)joining group
2023-07-03 23:47:42.606  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] (Re-)joining group
2023-07-03 23:47:45.612  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Successfully joined group with generation Generation{generationId=11, memberId='consumer-group_1-2-b9e9bb1c-35c6-4367-8e69-cef757677d49', protocol='range'}
2023-07-03 23:47:45.617  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Finished assignment for group at generation 11: {consumer-group_1-2-b9e9bb1c-35c6-4367-8e69-cef757677d49=Assignment(partitions=[topic_1-0])}
2023-07-03 23:47:45.626  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Successfully synced group in generation Generation{generationId=11, memberId='consumer-group_1-2-b9e9bb1c-35c6-4367-8e69-cef757677d49', protocol='range'}
2023-07-03 23:47:45.627  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Notifying assignor about the new Assignment(partitions=[topic_1-0])
2023-07-03 23:47:45.631  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Adding newly assigned partitions: topic_1-0
2023-07-03 23:47:45.646  INFO 1724 --- [  group_1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group_1-2, groupId=group_1] Setting offset for partition topic_1-0 to the committed offset FetchPosition{offset=27, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ (id: 1 rack: null)], epoch=0}}
2023-07-03 23:47:45.646  INFO 1724 --- [  group_1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group_1: partitions assigned: [topic_1-0]
2023-07-03 23:47:54.713  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : kj
2023-07-03 23:48:02.901  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : hello
2023-07-03 23:48:06.078  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : dixit
2023-07-03 23:48:09.379  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : singla
2023-07-03 23:48:12.273  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : bye
2023-07-03 23:48:14.904  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : bye
2023-07-03 23:48:16.157  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Successfully joined group with generation Generation{generationId=9, memberId='consumer-group-1-c26fe7a4-a7a0-4c35-95b8-f14fa04ab499', protocol='range'}
2023-07-03 23:48:16.157  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Finished assignment for group at generation 9: {consumer-group-1-c26fe7a4-a7a0-4c35-95b8-f14fa04ab499=Assignment(partitions=[topic_1-0])}
2023-07-03 23:48:16.162  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Successfully synced group in generation Generation{generationId=9, memberId='consumer-group-1-c26fe7a4-a7a0-4c35-95b8-f14fa04ab499', protocol='range'}
2023-07-03 23:48:16.163  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Notifying assignor about the new Assignment(partitions=[topic_1-0])
2023-07-03 23:48:16.165  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Adding newly assigned partitions: topic_1-0
2023-07-03 23:48:16.165  INFO 1724 --- [ pc-broker-poll] c.p.i.AbstractParallelEoSStreamProcessor : Assigned 1 total (1 new) partition(s) [topic_1-0]
2023-07-03 23:48:16.188  INFO 1724 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-1, groupId=group] Setting offset for partition topic_1-0 to the committed offset FetchPosition{offset=27, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ (id: 1 rack: null)], epoch=0}}
2023-07-03 23:48:16.215  INFO 1724 --- [pool-1-thread-1] icis.kafka.consumer.ParallelConumer      : kj
2023-07-03 23:48:16.226  INFO 1724 --- [pool-1-thread-1] pl.tlinkowski.unij.api.UniJLoader        : pl.tlinkowski.unij.service.api.collect.UnmodifiableListFactory service: selected pl.tlinkowski.unij.service.collect.jdk8.Jdk8UnmodifiableListFactory (priority=40)
2023-07-03 23:48:16.228  INFO 1724 --- [pool-1-thread-2] icis.kafka.consumer.ParallelConumer      : hello
2023-07-03 23:48:16.233  INFO 1724 --- [pool-1-thread-3] icis.kafka.consumer.ParallelConumer      : dixit
2023-07-03 23:48:16.236  INFO 1724 --- [pool-1-thread-4] icis.kafka.consumer.ParallelConumer      : singla
2023-07-03 23:48:16.238  INFO 1724 --- [pool-1-thread-5] icis.kafka.consumer.ParallelConumer      : bye
2023-07-03 23:48:16.239  INFO 1724 --- [ pc-broker-poll] pl.tlinkowski.unij.api.UniJLoader        : pl.tlinkowski.unij.service.api.collect.UnmodifiableMapFactory service: selected pl.tlinkowski.unij.service.collect.jdk8.Jdk8UnmodifiableMapFactory (priority=40)
2023-07-03 23:48:16.239  INFO 1724 --- [pool-1-thread-6] icis.kafka.consumer.ParallelConumer      : bye
2023-07-03 23:48:31.663  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : uy
2023-07-03 23:48:31.664  INFO 1724 --- [pool-1-thread-7] icis.kafka.consumer.ParallelConumer      : uy
2023-07-03 23:48:52.102  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : 1
2023-07-03 23:48:52.103  INFO 1724 --- [pool-1-thread-8] icis.kafka.consumer.ParallelConumer      : 1
2023-07-03 23:48:53.929  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : 2
2023-07-03 23:48:53.931  INFO 1724 --- [pool-1-thread-9] icis.kafka.consumer.ParallelConumer      : 2
2023-07-03 23:48:55.887  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : 3
2023-07-03 23:48:55.888  INFO 1724 --- [ool-1-thread-10] icis.kafka.consumer.ParallelConumer      : 3
2023-07-03 23:48:57.709  INFO 1724 --- [  group_1-0-C-1] icis.kafka.consumer.ParallelConumer      : 4
2023-07-03 23:48:57.710  INFO 1724 --- [ool-1-thread-11] icis.kafka.consumer.ParallelConumer      : 4

If you don't mind could you please send your running project example to my email id '[email protected]' for reference. Or can upload on the github so that it could help others.

Copy link

Sorry, I forgot this was about the parallel consumer.

You should not start the container.

It looks like you are seeing data.

As I said, there is nothing else to share, just copy/paste the code into a class named Kgh2381Application and the properties into (in src/main/resources).

Copy link


Previous .tgz was missing the .mvn directory.

Copy link

dixitsingla commented Jul 4, 2023

Thanks a lot for sharing the project.

public DefaultKafkaConsumerFactory kafkaConsumerFactory(

And on the custom consumer factory (don't want to use the spring boot's config) what changes do I need to do in the ConsumerProp class.

This is the current ConsumerProp class. I just wanted to use the below DefaultKafkaConsumerFactory instead of the default spring boot's consumer factory.

public class ConsumerProp {

    public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");

        return new DefaultKafkaConsumerFactory<>(configs);

Copy link

don't want to use the spring boot's config


Just add the bean and it will replace Boot's. If it's in a separate file (ConsumerProp); it must be in the same package hierarchy as the @SpringBootApplication.

Copy link

Hi, @garyrussell
I saw your comment No plans currently; but contributions are welcome!, this make me motivated!
For this feature, i create PR with skeleton code.
When you have free time, could you take a look please?
(PR : #3161)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
None yet

No branches or pull requests

4 participants