Skip to content

Commit

Permalink
feat: subscribe topics by pattern (#199)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh authored Jul 16, 2024
1 parent ccc9904 commit b425407
Show file tree
Hide file tree
Showing 18 changed files with 822 additions and 61 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ if __name__ == "__main__":

- [x] Produce events
- [x] Consumer events with `Streams`
- [x] Subscribe to topics by `pattern`
- [x] `Prometheus` metrics and custom monitoring
- [x] TestClient
- [x] Custom Serialization and Deserialization
Expand Down
15 changes: 1 addition & 14 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Consuming can be done using `kstreams.Stream`. You only need to decorate a `coro
options:
show_root_heading: true
docstring_section_style: table
show_signature_annotations: false
show_source: false
members:
-

Expand Down Expand Up @@ -187,19 +187,6 @@ Traceback (most recent call last):
AttributeError: 'ConsumerRecord' object has no attribute 'payload'
```

## Consuming from multiple topics

Consuming from multiple topics using one `stream` is possible. A `List[str]` of topics must be provided.

```python title="Consume from multiple topics"
stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream(["local--kstreams", "local--hello-world"], group_id="example-group")
async def consume(cr: ConsumerRecord) -> None:
print(f"Event consumed from topic {cr.topic}: headers: {cr.headers}, payload: {cr.value}")
```

## Changing consumer behavior

Most of the time you will only set the `topic` and the `group_id` to the `consumer`, but sometimes you might want more control over it, for example changing the `policy for resetting offsets on OffsetOutOfRange errors` or `session timeout`. To do this, you have to use the same `kwargs` as the [aiokafka consumer](https://aiokafka.readthedocs.io/en/stable/api.html#aiokafkaconsumer-class) API
Expand Down
55 changes: 55 additions & 0 deletions docs/test_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,61 @@ async with client:
assert event.key == key
```

## Topics subscribed by pattern

When a `Stream` is using `pattern` subscription it is not possible to know before hand how many topics the `Stream` will consume from.
To solve this problem the `topics` must be pre defined using the `extra topics` features from the `TestClient`:

In the following example we have a `Stream` that will consume from topics that match the regular expression `^dev--customer-.*$`, for example `dev--customer-invoice` and `dev--customer-profile`.

```python
# app.py
from kstreams import ConsumerRecord

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream(topics="^dev--customer-.*$", subscribe_by_pattern=True)
async def stream(cr: ConsumerRecord):
if cr.topic == customer_invoice_topic:
assert cr.value == invoice_event
elif cr.topic == customer_profile_topic:
assert cr.value == profile_event
else:
raise ValueError(f"Invalid topic {cr.topic}")
```

Then to test our `Stream`, we need to pre define the topics:

```python
# test_stream.py
import pytest
from kstreams.test_utils import TestStreamClient

from app import stream_engine


@pytest.mark.asyncio
async def test_consume_events_topics_by_pattern():
"""
This test shows the possibility to subscribe to multiple topics using a pattern
"""
customer_invoice_topic = "dev--customer-invoice"
customer_profile_topic = "dev--customer-profile"

client = TestStreamClient(
stream_engine, topics=[customer_invoice_topic, customer_profile_topic]
)

async with client:
await client.send(customer_invoice_topic, value=b"invoice-1", key="1")
await client.send(customer_profile_topic, value=b"profile-1", key="1")

# give some time to consume all the events
await asyncio.sleep(0.1)
assert TopicManager.all_messages_consumed()
```

## Disabling monitoring during testing

Monitoring streams and producers is vital for streaming application but it requires extra effort. Sometimes during testing,
Expand Down
47 changes: 47 additions & 0 deletions examples/subscribe-topics-by-pattern/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Subscribe topics by pattern

In the following example we have a `Stream` that will consume from topics that match the regular expression `^local--customer-.*$`, for example
`local--customer-invoice` and `local--customer-profile`.

## Requirements

python 3.8+, poetry, docker-compose

### Installation

```bash
poetry install
```

## Usage

1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start`
2. Inside this folder execute `poetry run app`

The app publishes events to the topics `local--customer-invoice` and `local--customer-profile`, then the events are consumed by the `stream` that has subscribed them using the pattern `^local--customer-.*$`.

You should see something similar to the following logs:

```bash
❯ poetry run app

INFO:aiokafka.consumer.consumer:Subscribed to topic pattern: re.compile('^local--customer-.*$')
INFO:kstreams.prometheus.monitor:Starting Prometheus Monitoring started...
INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'local--customer-profile', 'local--customer-invoice'})
INFO:aiokafka.consumer.group_coordinator:Discovered coordinator 1 for group topics-by-pattern-group
INFO:aiokafka.consumer.group_coordinator:Revoking previously assigned partitions set() for group topics-by-pattern-group
INFO:aiokafka.consumer.group_coordinator:(Re-)joining group topics-by-pattern-group
INFO:aiokafka.consumer.group_coordinator:Joined group 'topics-by-pattern-group' (generation 7) with member_id aiokafka-0.11.0-d4e8d901-666d-4286-8c6c-621a12b7216f
INFO:aiokafka.consumer.group_coordinator:Elected group leader -- performing partition assignments using roundrobin
INFO:aiokafka.consumer.group_coordinator:Successfully synced group topics-by-pattern-group with generation 7
INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='local--customer-profile', partition=0), TopicPartition(topic='local--customer-invoice', partition=0)} for group topics-by-pattern-group
INFO:subscribe_topics_by_pattern.app:Event b'profile-1' consumed from topic local--customer-profile
INFO:subscribe_topics_by_pattern.app:Event b'profile-1' consumed from topic local--customer-profile
INFO:subscribe_topics_by_pattern.app:Event b'invoice-1' consumed from topic local--customer-invoice
INFO:subscribe_topics_by_pattern.app:Event b'invoice-1' consumed from topic local--customer-invoice
```

## Note

If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where
`kstreams` is pointing to the parent folder. You will have to set the latest version.
Loading

0 comments on commit b425407

Please sign in to comment.