Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Certain topic partitions are not sending fetch requests since initial start #629

Open
1 of 2 tasks
Syntaf opened this issue Jun 24, 2024 · 0 comments
Open
1 of 2 tasks

Comments

@Syntaf
Copy link

Syntaf commented Jun 24, 2024

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

FWIW I have already dug through #166 and feel this issue is different enough in it's origins that I'm creating a new ticket to discuss or dig through this error.

My interpretation of this error message is that it's indicating that a particular TopicPartition (topic <-> partition pair) has not executed a fetch request since the workers have initially started.

We've been getting a large amount of these errors anytime a rebalance occurs on our faust pods, and have thus confirmed so far that there's been no negative impact on our ability to consume events or process streams. I've spent the last week+ trying to track down why this might be happening but am at a loss at this point.

From what I understand: Normally this message shouldn't occur because a background task on the Fetcher service; this task populates the fetcher's internal fetch queue with calls to the broker for each topic & partition in the current assignment.

Once the fetch queue is consumed, a TopicPartition's timestamp field is updated to represent the last time a fetch request was performed for the topic & partition.

The source of this error comes from a different background task on the Recovery service however; this task executes on startup or rebalance to build tables out of a topic's changelog and calls verify_event_path along the way. This method eventually checks whether each partition has performed a fetch since it's initial start and if one hasn't, this error message is received.

Expected behavior

Each topic-partition under the current assignments will eventually be fetched within the Fetcher requests routine

Actual behavior

Occasionally faust reports that certain topic-partitions have not proc'd a fetch request since their initial start:

[^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for ["<redacted>",0] since start (started 7.12 minutes ago)

Unknowns

I'm stumped as to how our faust consumers are seemingly processing everything as usual yet certain topic partitions never end up performing fetches since the initial start of the workers. Is this the result of a race condition, or some unique detail about how our configuration is setup?

    app = faust.App(
        id=settings.KAFKA_CONFIG["KAFKA_CONSUMER_GROUP"],
        autodiscover=True,
        broker=settings.KAFKA_CONFIG["KAFKA_BROKERS"],
        broker_credentials=broker_credentials,
        key_serializer="raw",
        value_serializer="raw",
        worker_redirect_stdouts=False,
        processing_guarantee="at_least_once",
        web_enabled=False,
        # creates a monitoring topic if not set, we can't make topics in our cluster!
        topic_disable_leader=True,
        topic_partitions=6,
        monitor=get_monitor_for_environment(settings.ENVIRONMENT),
    )

Any thoughts or recommendations here are appreciated! We're seeing tens of thousands of these error messages every week and it's unnecessarily impacting our sentry quota's as these are reported as errors

Versions

  • Python version 3.11
  • Faust version 0.10.21
  • Operating system
  • Kafka version 3.6.x
  • RocksDB version (if applicable) N/A
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant