diff --git a/docs/large_project_structure.md b/docs/large_project_structure.md new file mode 100644 index 0000000..bc4a379 --- /dev/null +++ b/docs/large_project_structure.md @@ -0,0 +1,187 @@ +If you have a large project with maybe multiple `streams` we recommend the following project structure: + +``` +├── my-project +│ ├── my_project +│ │   ├── __init__.py +│ │   ├── app.py +│ │   ├── resources.py +│ │   ├── streams.py +│ │   └── streams_roster.py +│ │── tests +│ │ ├── __init__.py +│ │ ├── conftest.py +│ │── pyproject.toml +│ │── README.md +``` + +- The file `my_project/resouces.py` contains the creation of the `StreamEngine` +- The file `my_project/app.py` contains the entrypoint of your program +- The file `my_project/streams.py` contains all the `Streams` + +A full project example ready to use can be found [here](https://github.com/kpn/kstreams/tree/master/examples/recommended-worker-app) + +!!! note + This is just a recommendation, there are many ways to structure your project + +## Resources + +This python module contains any global resource that will be used later in the application, for example `DB connections` or the `StreamEngine`. Typically we will have the following: + +```python +from kstreams import backends, create_engine + +backend = backends.Kafka( + bootstrap_servers=["localhost:9092"], + security_protocol=backends.kafka.SecurityProtocol.PLAINTEXT, +) + +stream_engine = kstreams.create_engine( + title="my-stream-engine", + backend=backend, +) +``` + +Then later `stream_engine` can be reused to start the application. + +## Streams + +When starting your project you can have `N` number of `Streams` with its `handler`, let's say in `streams.py` module. All of the `Streams` will run next to each other and because they are in the same project it is really easy to share common code. However, this comes with a downside of `scalability` as it is not possible to take the advantages of `kafka` and scale up `Streams` individually. In next versions the `StreamEngine` will be able to select which `Stream/s` should run to mitigate this issue. Typically, your `streams.py` will look like: + +```python +from kstreams import Stream + +from .streams_roster import stream_roster, stream_two_roster + + +my_stream = Stream( + "local--hello-world", + func=stream_roster, + config={ + "group_id": "example-group", + }, + ... +) + +my_second_stream = Stream( + "local--hello-world-2", + func=stream_two_roster, + config={ + "group_id": "example-group-2", + }, + ... +) + +... +``` + +and `streams_roster.py` contains all the `coroutines` that will be executed when an event arrives + +```python +import logging + +from kstreams import ConsumerRecord, Send, Stream + +logger = logging.getLogger(__name__) + + +async def stream_roster(cr: ConsumerRecord, send: Send) -> None: + logger.info(f"showing bytes: {cr.value}") + value = f"Event confirmed. {cr.value}" + + await send( + "another-topic-to-wink", + value=value.encode(), + key="1", + ) + + +async def stream_two_roster(cr: ConsumerRecord, send: Send, stream: Stream) -> None: + ... +``` + +It is worth to note three things: + +- We separate the `Stream` with its `coroutine` to be able to test the `business logic` easily +- If you need to produce events inside a `Stream` add the `send coroutine` using [dependency-injection](https://kpn.github.io/kstreams/stream/#dependency-injection) +- We are not using `StreamEngine` at all to avoid `circular import` errors + +## Application + +The `entrypoint` is usually in `app.py`. The module contains the import of `stream_engine`, it's `hooks` and the `streams` to be added to the `engine`: + +```python +import aiorun +import asyncio +import logging + +from kstreams.stream_utils import StreamErrorPolicy + +from .resources import stream_engine +from .streams import my_stream, my_second_stream + +logger = logging.getLogger(__name__) + + +# hooks +@stream_engine.after_startup +async def init_events(): + await stream_engine.send("local--hello-world", value="Hi Kstreams!") + + +# add the stream to the stream_engine +stream_engine.add_stream(my_stream, error_policy=StreamErrorPolicy.RESTART) +stream_engine.add_stream(my_second_stream, error_policy=StreamErrorPolicy.STOP_ENGINE) + + +async def start(): + await stream_engine.start() + + +async def stop(loop: asyncio.AbstractEventLoop): + await stream_engine.stop() + + +def main(): + logging.basicConfig(level=logging.INFO) + logger.info("Starting application...") + aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=stop) +``` + +To run it we recommend [aiorun](https://github.com/cjrh/aiorun). It can be also run with `asyncio` directly but `aiorun` does all the boilerplate for us. + +## Tests + +In this module you test your application using the `TestStreamClient`, usually provided as a `fixture` thanks to `pytest`. The package `pytest-asyncio` is also needed +to test `async` code. + +```python +# conftest.py +import pytest + +from kstreams.test_utils import TestStreamClient + +from my_project.resources import stream_engine + + +@pytest.fixture +def stream_client(): + return TestStreamClient(stream_engine=stream_engine) +``` + +then you can test your streams + +```python +# test_app.py +import pytest + + +@pytest.mark.asyncio +async def test_my_stream(stream_client): + topic = "local--hello-world" # Use the same topic as the stream + event = b'{"message": "Hello world!"}' + + async with stream_client: + metadata = await stream_client.send(topic, value=event, key="1") + assert metadata.topic == topic +``` diff --git a/examples/recommended-worker-app/recommended_worker_app/app.py b/examples/recommended-worker-app/recommended_worker_app/app.py index 16fa5a6..a58cd18 100644 --- a/examples/recommended-worker-app/recommended_worker_app/app.py +++ b/examples/recommended-worker-app/recommended_worker_app/app.py @@ -4,11 +4,11 @@ import aiorun from .resources import stream_engine -from .streams import consume +from .streams import my_stream logger = logging.getLogger(__name__) -stream_engine.add_stream(consume) +stream_engine.add_stream(my_stream) async def start(): diff --git a/examples/recommended-worker-app/recommended_worker_app/resources.py b/examples/recommended-worker-app/recommended_worker_app/resources.py index 7844c8b..f5648fc 100644 --- a/examples/recommended-worker-app/recommended_worker_app/resources.py +++ b/examples/recommended-worker-app/recommended_worker_app/resources.py @@ -1,3 +1,11 @@ -import kstreams +from kstreams import backends, create_engine -stream_engine = kstreams.create_engine(title="my-stream-engine") +backend = backends.Kafka( + bootstrap_servers=["localhost:9092"], + security_protocol=backends.kafka.SecurityProtocol.PLAINTEXT, +) + +stream_engine = create_engine( + title="my-stream-engine", + backend=backend, +) diff --git a/examples/recommended-worker-app/recommended_worker_app/streams.py b/examples/recommended-worker-app/recommended_worker_app/streams.py index 4d8cfb9..be06d46 100644 --- a/examples/recommended-worker-app/recommended_worker_app/streams.py +++ b/examples/recommended-worker-app/recommended_worker_app/streams.py @@ -1,17 +1,11 @@ -import logging +from kstreams import Stream -from kstreams import ConsumerRecord, Send, stream +from .streams_roster import stream_roster -logger = logging.getLogger(__name__) - - -@stream("local--hello-world", group_id="example-group") -async def consume(cr: ConsumerRecord, send: Send) -> None: - logger.info(f"showing bytes: {cr.value}") - value = f"Event confirmed. {cr.value}" - - await send( - "local--kstreams", - value=value.encode(), - key="1", - ) +my_stream = Stream( + "local--hello-world", + func=stream_roster, + config={ + "group_id": "example-group", + }, +) diff --git a/examples/recommended-worker-app/recommended_worker_app/streams_roster.py b/examples/recommended-worker-app/recommended_worker_app/streams_roster.py new file mode 100644 index 0000000..09ffc6b --- /dev/null +++ b/examples/recommended-worker-app/recommended_worker_app/streams_roster.py @@ -0,0 +1,16 @@ +import logging + +from kstreams import ConsumerRecord, Send + +logger = logging.getLogger(__name__) + + +async def stream_roster(cr: ConsumerRecord, send: Send) -> None: + logger.info(f"showing bytes: {cr.value}") + value = f"Event confirmed. {cr.value}" + + await send( + "local--kstreams", + value=value.encode(), + key="1", + ) diff --git a/mkdocs.yml b/mkdocs.yml index 0925568..1f863e7 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -36,6 +36,7 @@ nav: - Testing: 'test_client.md' - Middleware: 'middleware.md' - Utils: 'utils.md' + - Large Projects: 'large_project_structure.md' markdown_extensions: - pymdownx.highlight