Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer.send_and_wait() hangs up forever if sender_task is cancelled #1039

Open
homm opened this issue Aug 3, 2024 · 0 comments
Open

Producer.send_and_wait() hangs up forever if sender_task is cancelled #1039

homm opened this issue Aug 3, 2024 · 0 comments

Comments

@homm
Copy link

homm commented Aug 3, 2024

aiokafka relies on ioloop.create_task() method, which run coroutine in background and makes it "cancelable". The problem that the tasks list in asyncio is public and external code can cancel sender_task. After that Producer will be in the state, when any message will be successfully added to message_accumulator, but will never actually sent by Sender.

Originally I faced with this problem when using AsyncTestCase from Tornado. After each test, Tornado cancels all running background tasks. It's could be questionable how legal this behavior, but the fact is that only the public APIs are used by Tornado and there are no errors or exceptions are logged from aiokafka side. It just hangs up forever on the second and next tests.

I haven't digged in other circumstances when background asyncio tasks could be canceled by third-party libs, but if this happens there should be some sort of exception or error logging.

Expected behaviour
I expect the Sender recover after stop either Producer raise an error what sender_task is canceled and message will never delivered.

Environment (please complete the following information):

  • aiokafka version: 0.11.0
  • Kafka Broker version: 3.7.0

Reproducible example

import aiokafka
import asyncio

def cancel_tasks(loop):
    tasks = [t for t in asyncio.all_tasks(loop) if not t.done()]
    for t in tasks:
        if not t.done():
            t.cancel()
    if tasks:
        done, pending = loop.run_until_complete(asyncio.wait(tasks))
        assert not pending
    return tasks

async def prepare():
    global producer
    producer = aiokafka.AIOKafkaProducer(bootstrap_servers='kafka')
    await producer.start()

async def send():
    print('>>> before send')
    resp = await producer.send_and_wait('topic', b'body')
    print('>>>', resp)
    await producer.stop()

loop = asyncio.get_event_loop()
loop.run_until_complete(prepare())
print('>>>', cancel_tasks(loop))
loop.run_until_complete(send())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant