Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Losing Messages Between Producer and Consumer #1031

Open
hakan458 opened this issue Jul 16, 2024 · 6 comments
Open

Losing Messages Between Producer and Consumer #1031

hakan458 opened this issue Jul 16, 2024 · 6 comments

Comments

@hakan458
Copy link

hakan458 commented Jul 16, 2024

Describe the bug
Somehow messages are being lost between the Producer and Consumer.

I have an app in FastAPI which has an endpoint submit_batch which uses an aiokafka Producer to send messages to a certain topic. Looks like this:

async def submit_batch(batch: BatchData):
    topic = get_input_topic_name(batch.job_id)
    producer = AIOKafkaProducer(
        bootstrap_servers=settings.kafka_bootstrap_servers,
        value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    )
    await producer.start()

    try:
        for record in batch.data:
            await producer.send_and_wait(topic, value=record)
    except UnknownTopicOrPartitionError:
        await producer.stop()
        raise HTTPException(
            status_code=500, detail=f"{topic=} for job {batch.job_id} not found"
        )
    finally:
        await producer.stop()

    return Response[BatchSubmitted](data=BatchSubmitted(job_id=batch.job_id))

On the other end is a aiokafka Consumer which reads messages until a timeout is reached.

while True:
    try:
        data_batch = await self.consumer.getmany(
            timeout_ms=self.timeout_ms, max_records=batch_size
        )
        for topic_partition, messages in data_batch.items():
            batch_data = [msg.value for msg in messages]
    
        logger.info(
            f"Received a batch of {len(batch_data)} records from Kafka topic {self.kafka_input_topic}"
        )
        if not data_batch:
            print_text("No more data in the environment. Exiting.")
            break
    except Exception as e:
        # TODO: environment should raise a specific exception + log error
        print_error(f"Error getting data batch from environment: {e}")
        break

When I send 10000 tasks using the submit_batch endpoint, on the consumer side I only end up getting 7000-8000 range usually. Occasionally, maybe 1/10 runs I will get all 10000. I have confirmed that all 10000 are being sent to the endpoint, and keeping a counter there yields 10000. Maybe they are not being sent successfully? But I figured send_and_wait would raise an exception if not? Is there a more deterministic way to check?

Expected behaviour
I expect all 10000 messages sent by the Producer to be received by the Consumer

Environment (please complete the following information):

  • aiokafka version 0.10.0
  • Kafka Broker version 3.7.0

Reproducible example
docker-compose.yml

# docker-compose.yml
services:
  kafka:
    restart: always
    image: bitnami/kafka
    ports:
      - "9093:9093"
    volumes:
      - "./server/kafka-data:/bitnami"
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9093,CONTROLLER://:2181
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9093
      - KAFKA_BROKER_ID=1
      - [email protected]:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_KRAFT_CLUSTER_ID=MkU3OEVBNTcwNTJENDM2Qk
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false

Have not broken out the problem into its own script.

@hakan458
Copy link
Author

Whats strange is if I put a small time.sleep(0.1) in the for loop we are sending records in, I get all 10000 messages in the consumer.

        for record in batch.data:
            await producer.send_and_wait(topic, value=record)
            time.sleep(0.1)

I wouldnt expect that sending messages quickly would be an issue. I tried this because in the same app, we are processing all messages, and sending them through another kafka topic, and we never lose a single message in that case. Since it takes some times for records to be processed thats the only difference I could think of (rate of sending messages) and I added the time.sleep(0.1) which seems to solve the issue, but why?

@hakan458
Copy link
Author

Some more info it does seem that all 10k are being sent if the offset is to be trusted

{"message": "<AIOKafkaConnection host=127.0.0.1 port=9093> Response 1001: ProduceResponse_v7(topics=[(topic='adala-input-ab599429-c094-4eae-a71c-85de414aaf98', partitions=[(partition=0, error_code=0, offset=9999, timestamp=-1, log_start_offset=0)])], throttle_time_ms=0)"}

@hakan458
Copy link
Author

Another note is that the first queue which we lose messages in somehow, we consumer messages at a much slower rate than the second queue in which we lose none. Not sure if that would make any difference either just trying to think of anything at this point.

@hakan458
Copy link
Author

One more thing to note - if I send the 10k records to submit_batch in batch sizes of 10 - which means we will hit the endpoint 1000 times for 10k messages, we again lose many many messages. If I send 10k to submit_batch + have the time.sleep(0.1) thats the only way it seems to work so far. Creating a producer on every call is probably not the best idea.

@shravan20
Copy link

@hakan458 : Was this issue resolved?

@hakan458
Copy link
Author

@shravan20 No it was not. Worked around it but never solved the issue itself

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants