Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful Shutdown #162

Closed
JeroennC opened this issue Jan 24, 2024 · 4 comments
Closed

Graceful Shutdown #162

JeroennC opened this issue Jan 24, 2024 · 4 comments
Assignees

Comments

@JeroennC
Copy link
Contributor

Is your feature request related to a problem? Please describe.
In our codebase we've spent some effort into building our own shutdown mechanisms, which upon further investigation still does not accomplish exactly what we need.

The current way to stop the consumers is using stream_engine.stop(). This however has the downside that coroutines that are still running are not run until completion.
For example:

async def my_stream(cr: ConsumerRecord):
   print("Starting")
   asyncio.sleep(10)
   print("Complete")

Will never print "Complete" if stream_engine.stop() is called during the sleep.

This means that if autocommit is enabled the event is committed on Kafka but not actually processed. (Note: This could be overcome with manual commit, but having graceful shutdown would then still improve unnecessary double processing)

Describe the solution you'd like
There are multiple reasons for shutdown, but the two primary ones we see are:

  1. Application termination. A SIGTERM on the process to terminate the application in a standard manner
  2. Application error. Specifically with kstreams, if one of my streams/consumers crashes, it does not automatically recover, so I would like my application to exit so kubernetes can spin up a new container. (Note: an alternative to this use case is to have streams recover after crash)

Ideally when stream_engine.stop is awaited I would prefer it to:

  1. Stop all consumers (so no new events are consumed with the getone method)
  2. Wait for all consumer_tasks to finish (this requires the consumer_task to be checking whether a stop signal was given at the end of processing a ConsumerRecord)
  3. Optionally timeout if consumer_tasks take too long to finish

Second to that there is no way to trigger a callback on stream crash. A mechanism to trigger an application stop after a stream/consumer has crashed is also required to accomplish a shutdown on crash.
Currently we do this by wrapping our stream functions with an exception handler, which then triggers an application shutdown.
(Note: This might be worth a separate issue, leave that up to you)

I'm curious to hear about your insights in this challenge and what kstreams could provide to assist us.

@woile
Copy link
Member

woile commented Jan 24, 2024

I'll dig a bit deeper later, 2 things I can think of right now:

  1. Take a look at aiorun shield: https://github.com/cjrh/aiorun?tab=readme-ov-file#smart-shield-for-shutdown maybe you can get an idea from their code

A mechanism to trigger an application stop after a stream/consumer has crashed is also required to accomplish a shutdown on crash.

I think the upcoming Middlewares #155 could help with that

Is your app running by itself on a worker, or as part of fastapi?

@JeroennC
Copy link
Contributor Author

Our app is running as a worker, but through fastapi. Using uvicorn + starlette's lifespan to start and stop the stream engine

@JeroennC
Copy link
Contributor Author

The middleware is a nice improvement on building a way to keep track of streams including when they shutdown. There is one exception that I am not able to catch, which is thrown on this line https://github.com/kpn/kstreams/blob/0.17.1/kstreams/streams.py#L240

I want to close the consumers and let our "Stream Manager" know that a stream was closed (so we know we can gracefully shutdown). However when closing the consumer it will raise a ConsumerStoppedError from the .getone() call in func_wrapper_with_typing.
This exception can't be caught, so for now I'm thinking about checking whether the _consumer_task is done to consider a stream to have exited correctly

@marcosschroh marcosschroh self-assigned this Feb 6, 2024
marcosschroh added a commit to marcosschroh/kstreams that referenced this issue Feb 7, 2024
…rocessed before Streams are stopped. Related to kpn#162
marcosschroh added a commit to marcosschroh/kstreams that referenced this issue Feb 8, 2024
…rocessed before Streams are stopped. Related to kpn#162
marcosschroh added a commit that referenced this issue Feb 8, 2024
…rocessed before Streams are stopped. Related to #162 (#171)
marcosschroh added a commit to marcosschroh/kstreams that referenced this issue Jun 18, 2024
marcosschroh added a commit to marcosschroh/kstreams that referenced this issue Jul 1, 2024
…urrency paradigm. Now StreamEngine will stop on a Stream crash (only when running with aiorun). Closes kpn#60 due to the use of flag `stop_on_unhandled_errors` with aiorun. Related to kpn#162

anyio has been introduced as the way to run programs in worker mode
marcosschroh added a commit to marcosschroh/kstreams that referenced this issue Jul 1, 2024
…urrency paradigm. Now StreamEngine will stop on a Stream crash (only when running with aiorun). Closes kpn#60 due to the use of flag `stop_on_unhandled_errors` with aiorun. Related to kpn#162

anyio has been introduced as the way to run programs in worker mode
marcosschroh added a commit to marcosschroh/kstreams that referenced this issue Jul 1, 2024
…urrency paradigm. Now StreamEngine will stop on a Stream crash (only when running with aiorun). Closes kpn#60 due to the use of flag `stop_on_unhandled_errors` with aiorun. Related to kpn#162

anyio has been introduced as the way to run programs in worker mode
marcosschroh added a commit to marcosschroh/kstreams that referenced this issue Jul 1, 2024
…urrency paradigm. Now StreamEngine will stop on a Stream crash (only when running with aiorun). Closes kpn#60 due to the use of flag `stop_on_unhandled_errors` with aiorun. Related to kpn#162

anyio has been introduced as the way to run programs in worker mode
marcosschroh added a commit to marcosschroh/kstreams that referenced this issue Jul 1, 2024
…urrency paradigm. Now StreamEngine will stop on a Stream crash (only when running with aiorun). Closes kpn#60 due to the use of flag `stop_on_unhandled_errors` with aiorun. Related to kpn#162

anyio has been introduced as the way to run programs in worker mode
marcosschroh added a commit to marcosschroh/kstreams that referenced this issue Jul 1, 2024
…urrency paradigm. Now StreamEngine will stop on a Stream crash (only when running with aiorun). Closes kpn#60 due to the use of flag `stop_on_unhandled_errors` with aiorun. Related to kpn#162

anyio has been introduced as the way to run programs in worker mode
marcosschroh added a commit that referenced this issue Jul 1, 2024
…urrency paradigm. Now StreamEngine will stop on a Stream crash (only when running with aiorun). Closes #60 due to the use of flag `stop_on_unhandled_errors` with aiorun. Related to #162 (#190)

anyio has been introduced as the way to run programs in worker mode
@marcosschroh
Copy link
Collaborator

Now you can stablish error_policies per Stream. For FastAPI uses cases we recommend using STOP_APPLICATION error policy. You can find an example here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants