Skip to content

Commit

Permalink
fix: check whether Stream is running when TestStreamClient is leaving…
Browse files Browse the repository at this point in the history
… the context. This allows to send more events after a Stream has crashed and still the TestStreamClient will be able to leave the context without hanging (#204)
  • Loading branch information
marcosschroh authored Aug 28, 2024
1 parent 2b2a4ea commit 86c7829
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
10 changes: 7 additions & 3 deletions kstreams/test_utils/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Topic:
)
total_events: int = 0
# for now we assumed that 1 streams is connected to 1 topic
consumer: Optional["test_clients.Consumer"] = None
consumer: Optional[test_clients.Consumer] = None

async def put(self, event: ConsumerRecord) -> None:
await self.queue.put(event)
Expand Down Expand Up @@ -59,9 +59,13 @@ def offset(self, *, partition: int) -> int:
@property
def consumed(self) -> bool:
"""
We need to check if the Topic has a Consumer and if there are messages in it
Check whetner Topic is empty or a Consumer is running
"""
return self.consumer is None or self.is_empty()
return self.is_empty() or not self.is_consumer_running

@property
def is_consumer_running(self) -> bool:
return self.consumer is not None and not self.consumer._closed

def __str__(self) -> str:
return self.name
Expand Down
34 changes: 34 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,40 @@ async def test_clean_up_events(stream_engine: StreamEngine):
assert not TopicManager.topics


@pytest.mark.asyncio
async def test_exit_context_after_stream_crashing(stream_engine: StreamEngine):
"""
Check that when a Stream crashes after consuming 1 event it stops,
then the TestStreamClient should be able to leave the context even though
more events were send to the topic.
"""
topic = "kstreams--local"
value = b'{"message": "Hello world!"}'
client = TestStreamClient(stream_engine=stream_engine)

@stream_engine.stream(topic)
async def stream(cr: ConsumerRecord):
raise ValueError(f"Invalid topic {cr.topic}")

async with client:
topic_instance = TopicManager.get(topic)
await client.send(topic, value=value)

# Allow the event loop to switch context
await asyncio.sleep(1e-10)
assert topic_instance.size() == 0

# send another event to the topic, but the
# stream is already daed
await client.send(topic, value=value)
assert topic_instance.size() == 1

# From TopicManager point of view all events were consumed
# because there are not active Streams for the current topic,
# even though the topic `kstreams--local` is not empty
assert TopicManager.all_messages_consumed()


@pytest.mark.asyncio
async def test_partitions_for_topic(stream_engine: StreamEngine):
topic_name = "local--kstreams"
Expand Down

0 comments on commit 86c7829

Please sign in to comment.