diff --git a/kstreams/test_utils/topics.py b/kstreams/test_utils/topics.py index fbfb10d..156daa7 100644 --- a/kstreams/test_utils/topics.py +++ b/kstreams/test_utils/topics.py @@ -18,7 +18,7 @@ class Topic: ) total_events: int = 0 # for now we assumed that 1 streams is connected to 1 topic - consumer: Optional["test_clients.Consumer"] = None + consumer: Optional[test_clients.Consumer] = None async def put(self, event: ConsumerRecord) -> None: await self.queue.put(event) @@ -59,9 +59,13 @@ def offset(self, *, partition: int) -> int: @property def consumed(self) -> bool: """ - We need to check if the Topic has a Consumer and if there are messages in it + Check whetner Topic is empty or a Consumer is running """ - return self.consumer is None or self.is_empty() + return self.is_empty() or not self.is_consumer_running + + @property + def is_consumer_running(self) -> bool: + return self.consumer is not None and not self.consumer._closed def __str__(self) -> str: return self.name diff --git a/tests/test_client.py b/tests/test_client.py index c79636a..0ec46f3 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -328,6 +328,40 @@ async def test_clean_up_events(stream_engine: StreamEngine): assert not TopicManager.topics +@pytest.mark.asyncio +async def test_exit_context_after_stream_crashing(stream_engine: StreamEngine): + """ + Check that when a Stream crashes after consuming 1 event it stops, + then the TestStreamClient should be able to leave the context even though + more events were send to the topic. + """ + topic = "kstreams--local" + value = b'{"message": "Hello world!"}' + client = TestStreamClient(stream_engine=stream_engine) + + @stream_engine.stream(topic) + async def stream(cr: ConsumerRecord): + raise ValueError(f"Invalid topic {cr.topic}") + + async with client: + topic_instance = TopicManager.get(topic) + await client.send(topic, value=value) + + # Allow the event loop to switch context + await asyncio.sleep(1e-10) + assert topic_instance.size() == 0 + + # send another event to the topic, but the + # stream is already daed + await client.send(topic, value=value) + assert topic_instance.size() == 1 + + # From TopicManager point of view all events were consumed + # because there are not active Streams for the current topic, + # even though the topic `kstreams--local` is not empty + assert TopicManager.all_messages_consumed() + + @pytest.mark.asyncio async def test_partitions_for_topic(stream_engine: StreamEngine): topic_name = "local--kstreams"