Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] Getting commit errors after group rebalances #1054

Open
dhillonr opened this issue Oct 10, 2024 · 1 comment
Open

[QUESTION] Getting commit errors after group rebalances #1054

dhillonr opened this issue Oct 10, 2024 · 1 comment
Labels

Comments

@dhillonr
Copy link

dhillonr commented Oct 10, 2024

I have recently started using asyncio and aiokafka.
My requirement is to run n different kafka consumers consuming from different topics.
Each consumer on consumption pushes the data to a database and commits the offset once its done.

I am using manual commits for consumers to achieve atleast once delivery.
But i get error that committing wrong offset group has already rebalanced.
Logs from consumer are below :

Heartbeat failed for group .....-redis-cache-dev-dc-03 because it is rebalancing

 Revoking previously assigned partitions frozenset...

 (Re-)joining group .......-redis-cache-dev-dc-03

 Joined group '.......-redis-cache-dev-dc-03' (generation 2123) with member_id aiokafka-0.8.0-a77413e1-a0ea-4f6c-80ea-44f8f665fd54

 Elected group leader -- performing partition assignments using roundrobin

 OffsetCommit failed for group .......-redis-cache-dev-dc-03 due to group error ([Error 22] IllegalGenerationError: .......-redis-cache-dev-dc-03), will rejoin

OffsetCommit failed for group .....-redis-cache-dev-dc-03 due to group error ([Error 22] IllegalGenerationError: .......-redis-cache-dev-dc-03), will rejoin

Caught exception: CommitFailedError: ('Commit cannot be completed since the group has already\n            rebalanced and assigned the partitions to another member.\n            This means that the time between subsequent calls to poll()\n            was longer than the configured max_poll_interval_ms, which\n            typically implies that the poll loop is spending too much\n            time message processing. You can address this either by\n            increasing the rebalance timeout with max_poll_interval_ms,\n            or by reducing the maximum size of batches returned in poll()\n            with max_poll_records.\n            ', 'Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member')

Shutting down...
@dhillonr
Copy link
Author

Code looks like this :

async def save_msg(redis_conn, cache_queue, message):
    await redis_conn.rpush(cache_queue, message.value)

async def handle_partn_batch(consumer, topic_partition, redis_conn, 
                             partn_batch, cache_queue, logger, log_flag):
    tasks = []
    for message in partn_batch:
        # do processing of message
        task = asyncio.create_task(save_msg(redis_conn, cache_queue, message))
        tasks.append(task)
    
    await asyncio.gather(*tasks)

    # pass the topic, partition and offset as a dict of TopicPartition: OffsetAndMetadata
    await consumer.commit({topic_partition: partn_batch[-1].offset + 1})


async def consume(topic_name, consumer_name, connector, redis_conn, cache_queue, batch_size, threshold,
                  log_flag):
    consumer =  await connector.create_async_kafka_consumer(topic_name, consumer_name)
    while True:
        # check space available in cache
        current_queue_length = await redis_conn.llen(cache_queue) 

        if (current_queue_length + batch_size <= threshold):
            message_batch =  await consumer.getmany(timeout_ms=1 * 1000)
            logger.info(f"Consumed {sum(len(sublist) for sublist in message_batch.values())} for cache {cache_queue}")
            
            batch_tasks = []
            for topic_partition, partition_batch in message_batch.items():
                if partition_batch:
                    task = asyncio.create_task(handle_partn_batch(consumer, topic_partition, redis_conn, 
                                                                  partition_batch, cache_queue, logger, 
                                                                  log_flag))
                    batch_tasks.append(task)
            await asyncio.gather(*batch_tasks)

        else:
            if logger and log_flag:
                logger.info(f"[[Redis Cache  {cache_queue} is Full Sleeping....]]")
            await asyncio.sleep(0.01)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant