diff --git a/examples/consume-multiple-topics-example/README.md b/examples/consume-multiple-topics-example/README.md new file mode 100644 index 0000000..56b0ea3 --- /dev/null +++ b/examples/consume-multiple-topics-example/README.md @@ -0,0 +1,71 @@ +# Consume Multiple Topics Example + +Consume multiple topics example with `kstreams` + +## Requirements + +python 3.8+, poetry, docker-compose + +### Installation + +```bash +poetry install +``` + +## Usage + +1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start` +2. Inside this folder execute `poetry run app` + +The app should publish 5 events with the same payload (i.e. `'{"message": "Hello world from topic {topic}!"}'`) to two topics: `local--kstreams-2` and `local--hello-world`. +The `ConsumerRecord` will then be consumed and printed by the consumer. + +You should see something similar to the following logs: + +```bash +❯ me@me-pc simple-example % poetry run app +Topic local--kstreams-2 is not available during auto-create initialization +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Topic local--kstreams-2 is not available during auto-create initialization +Topic local--kstreams-2 is not available during auto-create initialization +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Message sent: RecordMetadata(topic='local--kstreams-2', partition=0, topic_partition=TopicPartition(topic='local--kstreams-2', partition=0), offset=0, timestamp=1703694932136, timestamp_type=0, log_start_offset=0) +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Message sent: RecordMetadata(topic='local--hello-world', partition=0, topic_partition=TopicPartition(topic='local--hello-world', partition=0), offset=0, timestamp=1703694934271, timestamp_type=0, log_start_offset=0) +Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError +Message sent: RecordMetadata(topic='local--kstreams-2', partition=0, topic_partition=TopicPartition(topic='local--kstreams-2', partition=0), offset=1, timestamp=1703694935284, timestamp_type=0, log_start_offset=0) +Event consumed from topic: local--kstreams-2, headers: (), payload: b'{"message": "Hello world from topic local--kstreams-2!"}' +Message sent: RecordMetadata(topic='local--hello-world', partition=0, topic_partition=TopicPartition(topic='local--hello-world', partition=0), offset=1, timestamp=1703694936320, timestamp_type=0, log_start_offset=0) +Event consumed from topic: local--hello-world, headers: (), payload: b'{"message": "Hello world from topic local--hello-world!"}' +Message sent: RecordMetadata(topic='local--kstreams-2', partition=0, topic_partition=TopicPartition(topic='local--kstreams-2', partition=0), offset=2, timestamp=1703694937336, timestamp_type=0, log_start_offset=0) +Event consumed from topic: local--kstreams-2, headers: (), payload: b'{"message": "Hello world from topic local--kstreams-2!"}' +Message sent: RecordMetadata(topic='local--hello-world', partition=0, topic_partition=TopicPartition(topic='local--hello-world', partition=0), offset=2, timestamp=1703694938349, timestamp_type=0, log_start_offset=0) +Event consumed from topic: local--hello-world, headers: (), payload: b'{"message": "Hello world from topic local--hello-world!"}' +Message sent: RecordMetadata(topic='local--kstreams-2', partition=0, topic_partition=TopicPartition(topic='local--kstreams-2', partition=0), offset=3, timestamp=1703694939361, timestamp_type=0, log_start_offset=0) +Event consumed from topic: local--kstreams-2, headers: (), payload: b'{"message": "Hello world from topic local--kstreams-2!"}' +Message sent: RecordMetadata(topic='local--hello-world', partition=0, topic_partition=TopicPartition(topic='local--hello-world', partition=0), offset=3, timestamp=1703694940376, timestamp_type=0, log_start_offset=0) +Event consumed from topic: local--hello-world, headers: (), payload: b'{"message": "Hello world from topic local--hello-world!"}' +Message sent: RecordMetadata(topic='local--kstreams-2', partition=0, topic_partition=TopicPartition(topic='local--kstreams-2', partition=0), offset=4, timestamp=1703694941386, timestamp_type=0, log_start_offset=0) +Event consumed from topic: local--kstreams-2, headers: (), payload: b'{"message": "Hello world from topic local--kstreams-2!"}' +Message sent: RecordMetadata(topic='local--hello-world', partition=0, topic_partition=TopicPartition(topic='local--hello-world', partition=0), offset=4, timestamp=1703694942404, timestamp_type=0, log_start_offset=0) +Event consumed from topic: local--hello-world, headers: (), payload: b'{"message": "Hello world from topic local--hello-world!"} +``` + +## Note + +If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where +`kstreams` is pointing to the parent folder. You will have to set the latest version. diff --git a/examples/consume-multiple-topics-example/consume_multiple_topics_example/__init__.py b/examples/consume-multiple-topics-example/consume_multiple_topics_example/__init__.py new file mode 100644 index 0000000..3dc1f76 --- /dev/null +++ b/examples/consume-multiple-topics-example/consume_multiple_topics_example/__init__.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/examples/consume_multiple_topics.py b/examples/consume-multiple-topics-example/consume_multiple_topics_example/app.py similarity index 78% rename from examples/consume_multiple_topics.py rename to examples/consume-multiple-topics-example/consume_multiple_topics_example/app.py index 2749708..69222cf 100644 --- a/examples/consume_multiple_topics.py +++ b/examples/consume-multiple-topics-example/consume_multiple_topics_example/app.py @@ -25,11 +25,20 @@ async def produce(events_per_topic: int = 5, delay_seconds: int = 1) -> None: await asyncio.sleep(delay_seconds) -async def main(): +async def start(): await stream_engine.start() await produce() + + +async def shutdown(): await stream_engine.stop() -if __name__ == "__main__": - asyncio.run(main()) +def main(): + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(start()) + loop.run_forever() + finally: + loop.run_until_complete(shutdown()) + loop.close() diff --git a/examples/consume-multiple-topics-example/pyproject.toml b/examples/consume-multiple-topics-example/pyproject.toml new file mode 100644 index 0000000..84e5824 --- /dev/null +++ b/examples/consume-multiple-topics-example/pyproject.toml @@ -0,0 +1,20 @@ +[tool.poetry] +name = "consume-multiple-topics-example" +version = "0.1.0" +description = "" +authors = ["Marcos Schroh , Santiago Fraire , Alexandre Cirilo "] + +[tool.poetry.dependencies] +python = "^3.8" +aiorun = "^2022.4.1" +kstreams = { path = "../../.", develop = true } + +[tool.poetry.dev-dependencies] + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" + + +[tool.poetry.scripts] +app = "consume_multiple_topics_example.app:main" \ No newline at end of file diff --git a/tests/test_client.py b/tests/test_client.py index 4f15ff1..c7be5b2 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -433,16 +433,21 @@ async def test_e2e_example(): ), ) async def test_e2e_consume_multiple_topics(monitoring_enabled): - from examples.consume_multiple_topics import produce, stream_engine, topics + consume_multiple_topics_example = importlib.import_module( + "examples.consume-multiple-topics-example.consume_multiple_topics_example.app" + ) total_events = 2 - client = TestStreamClient(stream_engine, monitoring_enabled=monitoring_enabled) + client = TestStreamClient( + consume_multiple_topics_example.stream_engine, + monitoring_enabled=monitoring_enabled, + ) async with client: - await produce(total_events) + await consume_multiple_topics_example.produce(total_events) - topic_1 = TopicManager.get(topics[0]) - topic_2 = TopicManager.get(topics[1]) + topic_1 = TopicManager.get(consume_multiple_topics_example.topics[0]) + topic_2 = TopicManager.get(consume_multiple_topics_example.topics[1]) assert topic_1.total_events == total_events assert topic_2.total_events == total_events