Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] Restarting AIOKafkaConsumer after AIOKafkaConsumer.stop() #1010

Open
dom-gunstone opened this issue May 22, 2024 · 4 comments
Open
Labels

Comments

@dom-gunstone
Copy link

dom-gunstone commented May 22, 2024

Hi,

I have a use case where I'm injecting a AIOKafkaConsumer instance into a class. The class has its own start and stop methods that, amongst other things, call the start and stop methods of the consumer class. The issue is that I'm failing the assert self._fetcher is None check in AIOKafkaConsumer.start() method.

Am I right in thinking this should work?

import asyncio

from aiokafka import AIOKafkaConsumer


async def main() -> None:
    consumer = AIOKafkaConsumer("test_topic", bootstrap_servers="localhost:9092")

    await consumer.start()
    await consumer.stop()
    await consumer.start()


if __name__ == "__main__":
    asyncio.run(main())

I suppose I could pass the class a function that creates the consumer instance, but it feels a bit less neat.

import asyncio

from aiokafka import AIOKafkaConsumer


async def main() -> None:
    def get_new_consumer() -> AIOKafkaConsumer:
        return AIOKafkaConsumer("test_topic", bootstrap_servers="localhost:9092")

    consumer = get_new_consumer()

    await consumer.start()
    await consumer.stop()

    consumer = get_new_consumer()

    await consumer.start()


if __name__ == "__main__":
    asyncio.run(main())
@ods
Copy link
Collaborator

ods commented May 22, 2024

No, consumer instance is not reusable, you have to recreate it.

@dom-gunstone
Copy link
Author

@ods Thanks for confirming.

Is this something that could be implemented, or is it not possible?

@ods
Copy link
Collaborator

ods commented May 23, 2024

It's possible to fix this, but I wouldn't expect somebody to invest their time in it.

@marcosschroh
Copy link

marcosschroh commented Aug 29, 2024

if is possible to fix it then it means that it should work as expected, unless that there is a clear decision of not to do it.

Can you explain the proper way of stopping a Consumer? I am asking because we have stop() and unsubscribe(). If the idea is to NOT reuse Consumers, then stop() should also call unsubscribe(), otherwise it does not make any sense.

If someone have the time to send a PR, would it be consider ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants