Skip to content

Commit

Permalink
feat: Middleware capability introduced
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh committed Jan 8, 2024
1 parent cb7e6f4 commit fc875e6
Show file tree
Hide file tree
Showing 25 changed files with 1,789 additions and 98 deletions.
210 changes: 210 additions & 0 deletions docs/middleware.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# Middleware

Kstreams allows you to include middlewares for adding behavior to streams.

A *middleware* is a `callable` that works with every `ConsumerRecord` (CR) *before* it is processed by a specific `stream`. Also *after* the `CR` has been handled `Middlewares` also have access to the `stream`, `send` function.

- It takes each `CR` that arrives to a topic.
- It can then do something to the `CR` or run any needed code.
- Then it passes the `CR` to be processed by the specific stream.
- Once the `CR` is processed by the stream, the chain is "completed".
- If there is code after the `self.next_call(cr)` then it will be executed.

Kstreams `Middleware` have the following protocol:

::: kstreams.middleware.middleware.Middleware

!!! warning
Middlewares only work with the new [Dependency Injection approach](https://kpn.github.io/kstreams/stream/#dependency-injection-and-typing)

## Creating a middleware

To create a middleware you have to create a class that inherits from `BaseMiddleware`. Then, the method `async def __call__` must be defined. Let's consider that we want to save the CR to `elastic` before it is processed:

```python
from kstreams import ConsumerRecord, middleware

async def save_to_elastic(cr: ConsumerRecord) -> None:
...


class ElasticMiddleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord) -> None:
# save to elastic before calling the next
await save_to_elastic(cr)

# the next call could be another middleware
await self.next_call(cr)
```

Then, we have to include the middleware. It can be per `stream` or per application level (`stream_engine`):

=== "Include middleware in stream"
```python
from kstreams import ConsumerRecord, middleware

from .engine import stream_engine


middlewares = [middleware.MiddlewareFactory(ElasticMiddleware)]

@stream_engine.stream("kstreams-topic", middlewares=middlewares)
async def processor(cr: ConsumerRecord):
...
```
=== "Include middleware at application level"
```python
from kstreams import ConsumerRecord, middleware, create

middlewares = [middleware.MiddlewareFactory(ElasticMiddleware)]

stream_engine = kstreams.create_engine(
title="my-stream-engine",
middlewares=middlewares
)
```

!!! note
The example middlewares make sense to include it at the application level, but other ones like Dead Letter Queue (DLQ) make sense to add them at stream level

## Adding options to the middleware

If you want to provide configuration options to the middleware class you should override the __init__ method, ensuring that is contains the keywargs `next_call`, `send` and `stream`, then any remaining are optional keyword arguments.

Let's consider that we want to send an event to a spcific topic when a `ValueError` is raised inside a `stream` (Dead Letter Queue)

```python
from kstreams import ConsumerRecord, types, Stream


class DLQMiddleware:
def __init__(
self,
*,
next_call: types.NextMiddlewareCall,
send: types.Send,
stream: Stream,
topic: str,
) -> None:
self.next_call = next_call
self.send = send
self.stream = stream
self.topic = topic

async def __call__(self, cr: ConsumerRecord) -> None:
try:
await self.next_call(cr)
except ValueError:
await self.send(self.topic, key=cr.key, value=cr.value)


# Create the middlewares
middlewares = [
middleware.MiddlewareFactory(
DLQMiddleware, topic="kstreams-dlq-topic"
)
]

@stream_engine.stream("kstreams-topic", middlewares=middlewares)
async def processor(cr: ConsumerRecord):
if cr.value == b"joker":
raise ValueError("Joker received...")
```

## Middleware by default

Kstreams includes one middleware by default, `ExceptionMiddleware`. This middleware adds exception handlers, for particular types of expected exception cases, for example when the `Consumer` stops (kafka disconnects), user presses `CTRL+C` or any other exception that could cause the `stream` to crash.

::: kstreams.middleware.middleware.ExceptionMiddleware

## Middleware chain

It is possible to add as many middlewares as you want in order to split and reuse business logic, however the downside is extra complexity and the code might become slower. Also, there are some points to take into account:

- The order when adding middlewares is important.
- If middlewares are added to a `stream` and `stream_engine`, then the middleware stack is build first with `stream` middlewares and then with the `stream_engines` middlewares. This means the first the `stream` middlewares are evaluated first.


In the example we are adding three middelwares in the following order: `DLQMiddleware`, `ElasticMiddleware`, and `S3Middleware`. The code chain execution will be:

`topic event` --> `ExceptionMiddleware` --> `DLQMiddleware` --> `ElasticMiddleware` --> `S3Middleware` --> `processor`

```python title="Multiple middlewares example"
from kstreams import ConsumerRecord, Stream, middleware


class DLQMiddleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord) -> None:
try:
await self.next_call(cr)
except ValueError:
await dlq(cr.value)


class ElasticMiddleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord) -> None:
await save_to_elastic(cr.value)
await self.next_call(cr)


class S3Middleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord) -> None:
await backup_to_s3(cr.value)
await self.next_call(cr)


middlewares = [
middleware.MiddlewareFactory(DLQMiddleware),
middleware.MiddlewareFactory(ElasticMiddleware),
middleware.MiddlewareFactory(S3Middleware),
]


@stream_engine.stream("kstreams-topic", middlewares=middlewares)
async def processor(cr: ConsumerRecord):
if cr.value == event_2:
raise ValueError("Error from stream...")
await save_to_db(cr.value)
```

!!! note
In the example we can see that always the `cr` will be save into `elastic` and `s3` regardless an error

## Executing Code after the CR was processed

As mentioned in the introduction, it is possible to execute code after the `CR` is handled. To do this, we need to place code after `next_call` is called:

```python title="Execute code after CR is handled"
from kstreams import ConsumerRecord, Stream, middleware


class DLQMiddleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord) -> None:
try:
await self.next_call(cr)
except ValueError:
await dlq(cr.value)


class ElasticMiddleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord) -> None:
await self.next_call(cr)
# This will be called after the whole chain has finished
await save_to_elastic(cr.value)


middlewares = [
middleware.MiddlewareFactory(DLQMiddleware),
middleware.MiddlewareFactory(ElasticMiddleware),
]


@stream_engine.stream("kstreams-topic", middlewares=middlewares)
async def processor(cr: ConsumerRecord):
if cr.value == event_2:
raise ValueError("Error from stream...")
await save_to_db(cr.value)
```

!!! note
In the example we can see that only if there is not an `error` the event is saved to `elastic`
46 changes: 46 additions & 0 deletions examples/dlq-middleware/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Dead Letter Queue (DLQ) middleware Example

## Requirements

python 3.8+, poetry, docker-compose

## Installation

```bash
poetry install
```

## Explanation

This shows how the `Middleware` concept can be applied to use a `DLQ`. In this example every time that the `ValueError` exception is raised inside the `stream`, meaning that the event was nor processed, the middleware will send an event to the topic `kstreams--dlq-topic`.

## Usage

1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start`
2. Inside this folder execute `poetry run app`
3. From `kstreams` project root, you can use the `./scripts/cluster/events/send` to send events to the kafka cluster. A prompt will open. Enter messages to send. The command is:
```bash
./scripts/cluster/events/send "local--hello-world"
```

Then, on the consume side, you should see something similar to the following logs:

```bash
❯ me@me-pc middleware-example % poetry run app

Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Consumer started
Event consumed: headers: (), payload: ConsumerRecord(topic='local--hello-world', partition=0, offset=0, timestamp=1660733921761, timestamp_type=0, key=None, value=b'boo', checksum=None, serialized_key_size=-1, serialized_value_size=3, headers=())
```

4. Consume from the topic `kstreams--dlq-topic`: `./scripts/cluster/events/read local--kstreams`
5. Procude the event `joker` using the terminal opened in step `3`. Then check the terminal opened in the previous step and the event `joker` must appear

## Note

If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where
`kstreams` is pointing to the parent folder. You will have to set the latest version.
Empty file.
52 changes: 52 additions & 0 deletions examples/dlq-middleware/dlq_middleware/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import aiorun

from kstreams import ConsumerRecord, Stream, create_engine, middleware, types


class DLQMiddleware:
def __init__(
self,
*,
next_call: types.NextMiddlewareCall,
send: types.Send,
stream: Stream,
topic: str,
) -> None:
self.next_call = next_call
self.send = send
self.stream = stream
self.topic = topic

async def __call__(self, cr: ConsumerRecord) -> None:
try:
print("here....")
await self.next_call(cr)
except ValueError:
print(f"\n Producing event {cr.value} to DLQ topic {self.topic} \n")
await self.send(self.topic, key=cr.key, value=cr.value)


middlewares = [
middleware.MiddlewareFactory(DLQMiddleware, topic="kstreams--dlq-topic"),
]
stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream(topics=["local--hello-world"], middlewares=middlewares)
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr}")
if cr.value == b"joker":
raise ValueError("Stream crashed 🤡 🤡 🤡 🤡 🤡 🤡 🤡 🤡")
print("evet has been proccesses")


async def start():
await stream_engine.start()


async def shutdown(loop):
await stream_engine.stop()


def main():
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
Loading

0 comments on commit fc875e6

Please sign in to comment.