Skip to content

Commit

Permalink
feat: StreamEngine hooks on_startup, on_stop, after_startup and after…
Browse files Browse the repository at this point in the history
…_stop added
  • Loading branch information
marcosschroh committed Jul 4, 2024
1 parent 063b2e6 commit 2d296b9
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ if __name__ == "__main__":
- [x] Yield events from streams
- [x] [Opentelemetry Instrumentation](https://github.com/kpn/opentelemetry-instrumentation-kstreams)
- [x] Middlewares
- [x] Hooks (on_startup, on_stop, after_startup, after_stop)
- [ ] Store (kafka streams pattern)
- [ ] Stream Join
- [ ] Windowing
Expand Down
5 changes: 5 additions & 0 deletions docs/engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@
show_root_heading: true
docstring_section_style: table
show_signature_annotations: false
members:
- on_startup
- after_startup
- on_stop
- after_stop
9 changes: 9 additions & 0 deletions kstreams/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .engine import StreamEngine
from .prometheus.monitor import PrometheusMonitor
from .serializers import Deserializer, Serializer
from .types import EngineHooks


def create_engine(
Expand All @@ -15,6 +16,10 @@ def create_engine(
serializer: Optional[Serializer] = None,
deserializer: Optional[Deserializer] = None,
monitor: Optional[PrometheusMonitor] = None,
on_startup: Optional[EngineHooks] = None,
on_stop: Optional[EngineHooks] = None,
after_startup: Optional[EngineHooks] = None,
after_stop: Optional[EngineHooks] = None,
) -> StreamEngine:
if monitor is None:
monitor = PrometheusMonitor()
Expand All @@ -30,4 +35,8 @@ def create_engine(
serializer=serializer,
deserializer=deserializer,
monitor=monitor,
on_startup=on_startup,
on_stop=on_stop,
after_startup=after_startup,
after_stop=after_startup,
)
148 changes: 146 additions & 2 deletions kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from .streams import Stream, StreamFunc
from .streams import stream as stream_func
from .streams_utils import UDFType
from .types import Headers, NextMiddlewareCall
from .utils import encode_headers
from .types import EngineHooks, Headers, NextMiddlewareCall
from .utils import encode_headers, execute_hooks

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -68,6 +68,10 @@ def __init__(
title: typing.Optional[str] = None,
deserializer: typing.Optional[Deserializer] = None,
serializer: typing.Optional[Serializer] = None,
on_startup: typing.Optional[EngineHooks] = None,
on_stop: typing.Optional[EngineHooks] = None,
after_startup: typing.Optional[EngineHooks] = None,
after_stop: typing.Optional[EngineHooks] = None,
) -> None:
self.title = title
self.backend = backend
Expand All @@ -78,6 +82,10 @@ def __init__(
self.monitor = monitor
self._producer: typing.Optional[typing.Type[Producer]] = None
self._streams: typing.List[Stream] = []
self._on_startup = [] if on_startup is None else list(on_startup)
self._on_stop = [] if on_stop is None else list(on_stop)
self._after_startup = [] if after_startup is None else list(after_startup)
self._after_stop = [] if after_stop is None else list(after_stop)

async def send(
self,
Expand Down Expand Up @@ -133,18 +141,154 @@ async def send(
return metadata

async def start(self) -> None:
# Execute on_startup hooks
await execute_hooks(self._on_startup)

# add the producer and streams to the Monitor
self.monitor.add_producer(self._producer)
self.monitor.add_streams(self._streams)

await self.start_producer()
await self.start_streams()

# Execute after_startup hooks
await execute_hooks(self._after_startup)

def on_startup(
self,
func: typing.Callable[[], typing.Any],
) -> typing.Callable[[], typing.Any]:
"""
A list of callables to run before the engine starts.
Handler are callables that do not take any arguments, and may be either
standard functions, or async functions.
Attributes:
func typing.Callable[[], typing.Any]: Func to callable before engine starts
!!! Example
```python title="Engine before startup"
import kstreams
stream_engine = kstreams.create_engine(
title="my-stream-engine"
)
@stream_engine.on_startup
async def init_db() -> None:
print("Initializing Database Connections")
await init_db()
@stream_engine.on_startup
async def start_background_task() -> None:
print("Some background task")
```
"""
self._on_startup.append(func)
return func

def on_stop(
self,
func: typing.Callable[[], typing.Any],
) -> typing.Callable[[], typing.Any]:
"""
A list of callables to run before the engine stops.
Handler are callables that do not take any arguments, and may be either
standard functions, or async functions.
Attributes:
func typing.Callable[[], typing.Any]: Func to callable before engine stops
!!! Example
```python title="Engine before stops"
import kstreams
stream_engine = kstreams.create_engine(
title="my-stream-engine"
)
@stream_engine.on_stop
async def close_db() -> None:
print("Closing Database Connections")
await db_close()
```
"""
self._on_stop.append(func)
return func

def after_startup(
self,
func: typing.Callable[[], typing.Any],
) -> typing.Callable[[], typing.Any]:
"""
A list of callables to run after the engine starts.
Handler are callables that do not take any arguments, and may be either
standard functions, or async functions.
Attributes:
func typing.Callable[[], typing.Any]: Func to callable after engine starts
!!! Example
```python title="Engine after startup"
import kstreams
stream_engine = kstreams.create_engine(
title="my-stream-engine"
)
@stream_engine.after_startup
async def after_startup() -> None:
print("Set pod as healthy")
await mark_healthy_pod()
```
"""
self._after_startup.append(func)
return func

def after_stop(
self,
func: typing.Callable[[], typing.Any],
) -> typing.Callable[[], typing.Any]:
"""
A list of callables to run after the engine stops.
Handler are callables that do not take any arguments, and may be either
standard functions, or async functions.
Attributes:
func typing.Callable[[], typing.Any]: Func to callable after engine stops
!!! Example
```python title="Engine after stops"
import kstreams
stream_engine = kstreams.create_engine(
title="my-stream-engine"
)
@stream_engine.after_stop
async def after_stop() -> None:
print("Finishing backgrpund tasks")
```
"""
self._after_stop.append(func)
return func

async def stop(self) -> None:
# Execute on_startup hooks
await execute_hooks(self._on_stop)

await self.monitor.stop()
await self.stop_producer()
await self.stop_streams()

# Execute after_startup hooks
await execute_hooks(self._after_stop)

async def stop_producer(self):
if self._producer is not None:
await self._producer.stop()
Expand Down
1 change: 1 addition & 0 deletions kstreams/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
EncodedHeaders = typing.Sequence[typing.Tuple[str, bytes]]
StreamFunc = typing.Callable
NextMiddlewareCall = typing.Callable[[ConsumerRecord], typing.Awaitable[None]]
EngineHooks = typing.Sequence[typing.Callable[[], typing.Any]]


class Send(typing.Protocol):
Expand Down
9 changes: 9 additions & 0 deletions kstreams/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import inspect
import ssl
from tempfile import NamedTemporaryFile
from typing import Any, Optional, Union
Expand Down Expand Up @@ -92,4 +93,12 @@ def create_ssl_context(
)


async def execute_hooks(hooks: types.EngineHooks) -> None:
for hook in hooks:
if inspect.iscoroutinefunction(hook):
await hook()
else:
hook()

Check warning on line 101 in kstreams/utils.py

View check run for this annotation

Codecov / codecov/patch

kstreams/utils.py#L101

Added line #L101 was not covered by tests


__all__ = ["create_ssl_context", "create_ssl_context_from_mem", "encode_headers"]
Loading

0 comments on commit 2d296b9

Please sign in to comment.