Skip to content

Commit

Permalink
chore: migrate consume multiple topics example to a proper project st…
Browse files Browse the repository at this point in the history
…ructure (#145)
  • Loading branch information
alxdrcirilo committed Jan 3, 2024
1 parent cbc3c75 commit 80a15f4
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 8 deletions.
71 changes: 71 additions & 0 deletions examples/consume-multiple-topics-example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Consume Multiple Topics Example

Consume multiple topics example with `kstreams`

## Requirements

python 3.8+, poetry, docker-compose

### Installation

```bash
poetry install
```

## Usage

1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start`
2. Inside this folder execute `poetry run app`

The app should publish 5 events with the same payload (i.e. `'{"message": "Hello world from topic {topic}!"}'`) to two topics: `local--kstreams-2` and `local--hello-world`.
The `ConsumerRecord` will then be consumed and printed by the consumer.

You should see something similar to the following logs:

```bash
❯ me@me-pc simple-example % poetry run app
Topic local--kstreams-2 is not available during auto-create initialization
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Topic local--kstreams-2 is not available during auto-create initialization
Topic local--kstreams-2 is not available during auto-create initialization
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Message sent: RecordMetadata(topic='local--kstreams-2', partition=0, topic_partition=TopicPartition(topic='local--kstreams-2', partition=0), offset=0, timestamp=1703694932136, timestamp_type=0, log_start_offset=0)
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Message sent: RecordMetadata(topic='local--hello-world', partition=0, topic_partition=TopicPartition(topic='local--hello-world', partition=0), offset=0, timestamp=1703694934271, timestamp_type=0, log_start_offset=0)
Group Coordinator Request failed: [Error 15] GroupCoordinatorNotAvailableError
Message sent: RecordMetadata(topic='local--kstreams-2', partition=0, topic_partition=TopicPartition(topic='local--kstreams-2', partition=0), offset=1, timestamp=1703694935284, timestamp_type=0, log_start_offset=0)
Event consumed from topic: local--kstreams-2, headers: (), payload: b'{"message": "Hello world from topic local--kstreams-2!"}'
Message sent: RecordMetadata(topic='local--hello-world', partition=0, topic_partition=TopicPartition(topic='local--hello-world', partition=0), offset=1, timestamp=1703694936320, timestamp_type=0, log_start_offset=0)
Event consumed from topic: local--hello-world, headers: (), payload: b'{"message": "Hello world from topic local--hello-world!"}'
Message sent: RecordMetadata(topic='local--kstreams-2', partition=0, topic_partition=TopicPartition(topic='local--kstreams-2', partition=0), offset=2, timestamp=1703694937336, timestamp_type=0, log_start_offset=0)
Event consumed from topic: local--kstreams-2, headers: (), payload: b'{"message": "Hello world from topic local--kstreams-2!"}'
Message sent: RecordMetadata(topic='local--hello-world', partition=0, topic_partition=TopicPartition(topic='local--hello-world', partition=0), offset=2, timestamp=1703694938349, timestamp_type=0, log_start_offset=0)
Event consumed from topic: local--hello-world, headers: (), payload: b'{"message": "Hello world from topic local--hello-world!"}'
Message sent: RecordMetadata(topic='local--kstreams-2', partition=0, topic_partition=TopicPartition(topic='local--kstreams-2', partition=0), offset=3, timestamp=1703694939361, timestamp_type=0, log_start_offset=0)
Event consumed from topic: local--kstreams-2, headers: (), payload: b'{"message": "Hello world from topic local--kstreams-2!"}'
Message sent: RecordMetadata(topic='local--hello-world', partition=0, topic_partition=TopicPartition(topic='local--hello-world', partition=0), offset=3, timestamp=1703694940376, timestamp_type=0, log_start_offset=0)
Event consumed from topic: local--hello-world, headers: (), payload: b'{"message": "Hello world from topic local--hello-world!"}'
Message sent: RecordMetadata(topic='local--kstreams-2', partition=0, topic_partition=TopicPartition(topic='local--kstreams-2', partition=0), offset=4, timestamp=1703694941386, timestamp_type=0, log_start_offset=0)
Event consumed from topic: local--kstreams-2, headers: (), payload: b'{"message": "Hello world from topic local--kstreams-2!"}'
Message sent: RecordMetadata(topic='local--hello-world', partition=0, topic_partition=TopicPartition(topic='local--hello-world', partition=0), offset=4, timestamp=1703694942404, timestamp_type=0, log_start_offset=0)
Event consumed from topic: local--hello-world, headers: (), payload: b'{"message": "Hello world from topic local--hello-world!"}
```
## Note
If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where
`kstreams` is pointing to the parent folder. You will have to set the latest version.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "0.1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,20 @@ async def produce(events_per_topic: int = 5, delay_seconds: int = 1) -> None:
await asyncio.sleep(delay_seconds)


async def main():
async def start():
await stream_engine.start()
await produce()


async def shutdown():
await stream_engine.stop()


if __name__ == "__main__":
asyncio.run(main())
def main():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start())
loop.run_forever()
finally:
loop.run_until_complete(shutdown())
loop.close()
20 changes: 20 additions & 0 deletions examples/consume-multiple-topics-example/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[tool.poetry]
name = "consume-multiple-topics-example"
version = "0.1.0"
description = ""
authors = ["Marcos Schroh <[email protected]>, Santiago Fraire <[email protected]>, Alexandre Cirilo <[email protected]>"]

[tool.poetry.dependencies]
python = "^3.8"
aiorun = "^2022.4.1"
kstreams = { path = "../../.", develop = true }

[tool.poetry.dev-dependencies]

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"


[tool.poetry.scripts]
app = "consume_multiple_topics_example.app:main"
15 changes: 10 additions & 5 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,16 +433,21 @@ async def test_e2e_example():
),
)
async def test_e2e_consume_multiple_topics(monitoring_enabled):
from examples.consume_multiple_topics import produce, stream_engine, topics
consume_multiple_topics_example = importlib.import_module(
"examples.consume-multiple-topics-example.consume_multiple_topics_example.app"
)

total_events = 2
client = TestStreamClient(stream_engine, monitoring_enabled=monitoring_enabled)
client = TestStreamClient(
consume_multiple_topics_example.stream_engine,
monitoring_enabled=monitoring_enabled,
)

async with client:
await produce(total_events)
await consume_multiple_topics_example.produce(total_events)

topic_1 = TopicManager.get(topics[0])
topic_2 = TopicManager.get(topics[1])
topic_1 = TopicManager.get(consume_multiple_topics_example.topics[0])
topic_2 = TopicManager.get(consume_multiple_topics_example.topics[1])

assert topic_1.total_events == total_events
assert topic_2.total_events == total_events
Expand Down

0 comments on commit 80a15f4

Please sign in to comment.