Skip to content

Commit

Permalink
feat: first steps to add dependency injection. Inspect udf coroutines…
Browse files Browse the repository at this point in the history
… in order to inject different args on it (#141)
  • Loading branch information
marcosschroh authored Dec 6, 2023
1 parent b863948 commit b2461b6
Show file tree
Hide file tree
Showing 34 changed files with 2,101 additions and 1,234 deletions.
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ pip install aiorun

```python
import aiorun
from kstreams import create_engine, Stream
from kstreams import create_engine, ConsumerRecord


stream_engine = create_engine(title="my-stream-engine")

@stream_engine.stream("local--kstream")
async def consume(stream: Stream):
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


async def produce():
Expand Down
21 changes: 9 additions & 12 deletions docs/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ You can starting using `kstreams` with simple `producers` and `consumers` and/or

```python title="Simple use case"
import asyncio
from kstreams import create_engine, Stream
from kstreams import create_engine, ConsumerRecord

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream("local--py-stream", group_id="de-my-partition")
async def consume(stream: Stream):
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {value}")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {value}")


async def produce():
Expand Down Expand Up @@ -54,15 +53,14 @@ so you want have to worry about `set signal handlers`, `shutdown callbacks`, `gr

```python title="Usage with aiorun"
import aiorun
from kstreams import create_engine, Stream
from kstreams import create_engine, ConsumerRecord

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream("local--py-stream", group_id="de-my-partition")
async def consume(stream: Stream):
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {value}")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {value}")


async def produce():
Expand Down Expand Up @@ -109,13 +107,12 @@ Define the `streams`:
```python title="Application stream"
# streaming.streams.py
from .engine import stream_engine
from kstreams import Stream
from kstreams import ConsumerRecord


@stream_engine.stream("local--kstream")
async def stream(stream: Stream):
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.payload}")
async def stream(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.payload}")
```

Create the `FastAPI`:
Expand Down
7 changes: 3 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ pip install aiorun

```python
import aiorun
from kstreams import create_engine, Stream
from kstreams import create_engine, ConsumerRecord


stream_engine = create_engine(title="my-stream-engine")

@stream_engine.stream("local--kstream")
async def consume(stream: Stream):
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


async def produce():
Expand Down
13 changes: 6 additions & 7 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,13 @@ In our kstreams app, we can:
stream_engine = create_engine(title="my-engine", monitor=MyAppPrometheusMonitor())

@stream_engine.stream("my-special-orders")
async def consume_orders_received(consumer):
for cr, value, _ in consumer:
if value.status == "NEW":
stream_engine.monitor.increase_received()
elif value.status == "SHIPPED":
stream_engine.monitor.increase_shipped()
async def consume_orders_received(cr: ConsumerRecord):
if cr.value.status == "NEW":
stream_engine.monitor.increase_received()
elif cr.value.status == "SHIPPED":
stream_engine.monitor.increase_shipped()
```

Your app's prometheus would display this data, which you might utilize to build a stylish ✨dashboard✨ interface.

For further details, see the [Prometheus python client](https://github.com/prometheus/client) documentation.
For further details, see the [Prometheus python client](https://github.com/prometheus/client) documentation.
9 changes: 4 additions & 5 deletions docs/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ stream_engine = create_engine(

```python
@stream_engine.stream(topic, deserializer=JsonDeserializer())
async def hello_stream(stream: Stream):
async for event in stream:
# remember event.value is now a dict
print(event.value["message"])
save_to_db(event)
async def hello_stream(cr: ConsumerRecord):
# remember event.value is now a dict
print(cr.value["message"])
save_to_db(cr)
```

```python
Expand Down
67 changes: 48 additions & 19 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,40 @@ Consuming can be done using `kstreams.Stream`. You only need to decorate a `coro
docstring_section_style: table
show_signature_annotations: false

## Dependency Injection and typing

The old way to itereate over a stream is with the `async for _ in stream` loop. The iterable approach works but in most cases end users are interested only in the `ConsumerRecord` and not in the `stream`, for this reason now it is possible to remove the `loop` and every time that a new event is in the stream the `coroutine` function defined by the end user will ba `awaited`. If the `stream` is also needed, for example because `manual` commit is enabled then you can also add the `stream` as an argument in the coroutine.

=== "Use only the ConsumerRecord"
```python
@stream_engine.stream(topic, name="my-stream")
async def my_stream(cr: ConsumerRecord):
save_to_db(cr.value)
```

=== "Use ConsumerRecord and Stream"
```python
@stream_engine.stream(topic, name="my-stream", enable_auto_commit=False)
async def my_stream(cr: ConsumerRecord, stream: Stream):
save_to_db(cr.value)
await stream.commit()
```

=== "Old fashion"
```python
@stream_engine.stream(topic, name="my-stream")
async def consume(stream): # you can specify the type but it will be the same result
async for cr in stream:
save_to_db(cr.value)
# you can do something with the stream as well!!
```

!!! note
A proper typing is required in order to remove the `async for in` loop. The argument order is also important, this might change in the future.

!!! note
It is still possible to use the `async for in` loop, but it might be removed in the future.

## Creating a Stream instance

If for any reason you need to create `Streams` instances directly, you can do it without using the decorator `stream_engine.stream`.
Expand All @@ -27,9 +61,8 @@ class MyDeserializer:
return consumer_record.value.decode()


async def stream(stream: Stream) -> None:
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def stream(cr: ConsumerRecord) -> None:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


stream = Stream(
Expand Down Expand Up @@ -110,15 +143,14 @@ As an end user you are responsable of deciding what to do. In future version app

```python title="Crashing example"
import aiorun
from kstreams import create_engine
from kstreams import create_engine, ConsumerRecord

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream("local--kstreams", group_id="de-my-partition")
async def stream(stream: Stream) -> None:
async for cr in stream:
print(f"Event consumed. Payload {cr.payload}")
async def stream(cr: ConsumerRecord) -> None:
print(f"Event consumed. Payload {cr.payload}")


async def produce():
Expand Down Expand Up @@ -162,9 +194,8 @@ stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream(["local--kstreams", "local--hello-world"], group_id="example-group")
async def consume(stream: Stream) -> None:
async for cr in stream:
print(f"Event consumed from topic {cr.topic}: headers: {cr.headers}, payload: {cr.value}")
async def consume(cr: ConsumerRecord) -> None:
print(f"Event consumed from topic {cr.topic}: headers: {cr.headers}, payload: {cr.value}")
```

## Changing consumer behavior
Expand All @@ -176,9 +207,8 @@ Most of the time you will only set the `topic` and the `group_id` to the `consum
# On OffsetOutOfRange errors, the offset will move to the oldest available message (‘earliest’)

@stream_engine.stream("local--kstream", group_id="de-my-partition", session_timeout_ms=500, auto_offset_reset"earliest")
async def stream(stream: Stream):
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def stream(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
```

## Manual commit
Expand All @@ -187,13 +217,12 @@ When processing more sensitive data and you want to be sure that the `kafka offe

```python title="Manual commit example"
@stream_engine.stream("local--kstream", group_id="de-my-partition", enable_auto_commit=False)
async def stream(stream: Stream):
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def stream(cr: ConsumerRecord, stream: Stream):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")

# We need to make sure that the pyalod was stored before commiting the kafka offset
await store_in_database(payload)
await stream.consumer.commit() # You need to commit!!!
# We need to make sure that the pyalod was stored before commiting the kafka offset
await store_in_database(payload)
await stream.commit() # You need to commit!!!
```

!!! note
Expand Down
16 changes: 7 additions & 9 deletions docs/test_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ event_store = EventStore()


@stream_engine.stream(topic, group_id="example-group")
async def consume(stream: Stream):
async for cr in stream:
event_store.add(cr)
async def consume(cr: ConsumerRecord):
event_store.add(cr)


async def produce():
Expand Down Expand Up @@ -129,7 +128,7 @@ In some cases your stream will commit, in this situation checking the commited p
```python
import pytest
from kstreams.test_utils import TestStreamClient
from kstreams import TopicPartition
from kstreams import ConsumerRecord, Stream, TopicPartition

from .example import produce, stream_engine

Expand All @@ -145,10 +144,9 @@ tp = TopicPartition(
total_events = 10

@stream_engine.stream(topic_name, name=name)
async def my_stream(stream: Stream):
async for cr in stream:
# commit every time that an event arrives
await stream.consumer.commit({tp: cr.offset})
async def my_stream(cr: ConsumerRecord, stream: Stream):
# commit every time that an event arrives
await stream.commit({tp: cr.offset})


# test the code
Expand All @@ -162,7 +160,7 @@ async def test_consumer_commit(stream_engine: StreamEngine):

# check that everything was commited
stream = stream_engine.get_stream(name)
assert (await stream.consumer.committed(tp)) == total_events
assert (await stream.committed(tp)) == total_events
```

### E2E test
Expand Down
4 changes: 2 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
## Welcome to `kstreams` examples.
# Welcome to `kstreams` examples

In order to run the examples you need `docker-compose`. In the ptoject root you will find the file `docker-compose.yaml` that contains a mininal setup to run `kafka` and `zookeeper`.

### Steps:
## Steps

1. Activate your `virtualenv`: `poetry shell`
2. Create the kafka cluster: `make kafka-cluster`
Expand Down
12 changes: 5 additions & 7 deletions examples/confluent-example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,22 +144,20 @@ And there are two `streams` that will consume events from `local--deployment` an

```python
from .engine import stream_engine
from kstreams import Stream
from kstreams import ConsumerRecord

deployment_topic = "local--deployment"
country_topic = "local--country"


@stream_engine.stream(deployment_topic)
async def deployment_stream(stream: Stream):
async for cr in stream:
print(f"Event consumed on topic {deployment_topic}. The user is {cr.value}")
async def deployment_stream(cr: ConsumerRecord):
print(f"Event consumed on topic {deployment_topic}. The user is {cr.value}")


@stream_engine.stream(country_topic)
async def country_stream(stream: Stream):
async for cr in stream:
print(f"Event consumed on topic {country_topic}. The Address is {cr.value}")
async def country_stream(cr: ConsumerRecord):
print(f"Event consumed on topic {country_topic}. The Address is {cr.value}")
```

## Note
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kstreams import Stream
from kstreams import ConsumerRecord

from .engine import stream_engine

Expand All @@ -7,12 +7,10 @@


@stream_engine.stream(deployment_topic)
async def deployment_stream(stream: Stream):
async for cr in stream:
print(f"Event consumed on topic {deployment_topic}. The user is {cr.value}")
async def deployment_stream(cr: ConsumerRecord):
print(f"Event consumed on topic {deployment_topic}. The user is {cr.value}")


@stream_engine.stream(country_topic)
async def country_stream(stream: Stream):
async for cr in stream:
print(f"Event consumed on topic {country_topic}. The Address is {cr.value}")
async def country_stream(cr: ConsumerRecord):
print(f"Event consumed on topic {country_topic}. The Address is {cr.value}")
Loading

0 comments on commit b2461b6

Please sign in to comment.