From 18dc4c87e32c0e103f725926ff318df2e568266d Mon Sep 17 00:00:00 2001 From: Marcos Schroh <2828842+marcosschroh@users.noreply.github.com> Date: Tue, 9 Jul 2024 13:59:36 +0200 Subject: [PATCH] test: StreamEngine and Stream tests refactored (#197) --- tests/test_stream_engine.py | 331 +++++++----------------------------- tests/test_streams.py | 222 ++++++++++++++++++++++++ 2 files changed, 281 insertions(+), 272 deletions(-) create mode 100644 tests/test_streams.py diff --git a/tests/test_stream_engine.py b/tests/test_stream_engine.py index 974e51d..0fc1257 100644 --- a/tests/test_stream_engine.py +++ b/tests/test_stream_engine.py @@ -1,113 +1,59 @@ import asyncio -from typing import Callable, Set +from typing import Callable from unittest import mock import pytest -from kstreams import ConsumerRecord, TopicPartition +from kstreams import ConsumerRecord from kstreams.clients import Consumer, Producer from kstreams.engine import Stream, StreamEngine from kstreams.exceptions import DuplicateStreamException, EngineNotStartedException -from kstreams.streams import stream -from kstreams.structs import TopicPartitionOffset @pytest.mark.asyncio -async def test_seek_to_initial_offsets_normal( - stream_engine: StreamEngine, consumer_record_factory -): - assignments: Set[TopicPartition] = set() - partition = 100 - offset = 10 - topic_name = "example_topic" - value = b"Hello world" - assignments.add(TopicPartition(topic=topic_name, partition=partition)) - seek_mock = mock.Mock() +async def test_add_streams(stream_engine: StreamEngine): + topic = "local--hello-kpn" - async def getone(_): - return consumer_record_factory(value=value) + @stream_engine.stream(topic, name="my-stream") + async def stream(_): + pass - with mock.patch.multiple( - Consumer, - assignment=lambda _: assignments, - seek=seek_mock, - start=mock.DEFAULT, - getone=getone, - ): - - @stream_engine.stream( - topic_name, - initial_offsets=[ - TopicPartitionOffset( - topic=topic_name, partition=partition, offset=offset - ) - ], - ) - async def stream(my_stream): - async for cr in my_stream: - assert cr.value == value - break - - await stream.start() - # simulate a partitions assigned rebalance - await stream.rebalance_listener.on_partitions_assigned(assigned=assignments) - - seek_mock.assert_called_once_with( - partition=TopicPartition(topic=topic_name, partition=partition), - offset=offset, - ) + stream_instance = stream_engine.get_stream("my-stream") + assert stream_instance == stream + assert stream_instance.topics == [topic] @pytest.mark.asyncio -async def test_seek_to_initial_offsets_ignores_wrong_input( - stream_engine: StreamEngine, consumer_record_factory -): - offset = 100 - partition = 100 - topic_name = "example_topic" - wrong_topic = "different_topic" - value = b"Hello world" - wrong_partition = 1 - assignments: Set[TopicPartition] = set() - assignments.add(TopicPartition(topic=topic_name, partition=partition)) - seek_mock = mock.Mock() +async def test_add_existing_streams(stream_engine: StreamEngine): + topic = "local--hello-kpn" - async def getone(_): - return consumer_record_factory(value=value) + @stream_engine.stream(topic, name="my-stream") + async def stream(_): + pass - with mock.patch.multiple( - Consumer, - assignment=lambda _: assignments, - seek=seek_mock, - start=mock.DEFAULT, - getone=getone, - ): - - @stream_engine.stream( - topic_name, - initial_offsets=[ - TopicPartitionOffset( - topic=wrong_topic, partition=partition, offset=offset - ), - TopicPartitionOffset( - topic=topic_name, partition=wrong_partition, offset=offset - ), - ], - ) - async def stream(my_stream): - async for cr in my_stream: - assert cr.value == value - break - - await stream.start() - # simulate a partitions assigned rebalance - await stream.rebalance_listener.on_partitions_assigned(assigned=assignments) - seek_mock.assert_not_called() + with pytest.raises(DuplicateStreamException): + + @stream_engine.stream(topic, name="my-stream") + async def my_stream(_): + pass @pytest.mark.asyncio -async def test_remove_existing_stream(stream_engine: StreamEngine): - topic = "local--hello-kpn" +async def test_add_stream_multiple_topics(stream_engine: StreamEngine): + topics = ["local--hello-kpn", "local--hello-kpn-2"] + + @stream_engine.stream(topics, name="my-stream") + async def stream(_): + pass + + stream_instance = stream_engine.get_stream("my-stream") + assert stream_instance == stream + assert stream_instance.topics == topics + + +@pytest.mark.asyncio +async def test_add_stream_as_instance(stream_engine: StreamEngine): + topics = ["local--hello-kpn", "local--hello-kpn-2"] class MyDeserializer: ... @@ -118,20 +64,27 @@ async def processor(stream: Stream): pass my_stream = Stream( - topic, + topics, name="my-stream", func=processor, deserializer=deserializer, ) + assert not stream_engine.get_stream("my-stream") + stream_engine.add_stream(my_stream) - assert len(stream_engine._streams) == 1 - await stream_engine.remove_stream(my_stream) - assert len(stream_engine._streams) == 0 + stream_instance = stream_engine.get_stream("my-stream") + assert stream_instance == my_stream + assert stream_instance.topics == topics + assert stream_instance.deserializer == deserializer + + # can not add a stream with the same name + with pytest.raises(DuplicateStreamException): + stream_engine.add_stream(Stream("a-topic", name="my-stream", func=processor)) @pytest.mark.asyncio -async def test_remove_missing_stream(stream_engine: StreamEngine): +async def test_remove_existing_stream(stream_engine: StreamEngine): topic = "local--hello-kpn" class MyDeserializer: @@ -149,12 +102,14 @@ async def processor(stream: Stream): deserializer=deserializer, ) - with pytest.raises(ValueError): - await stream_engine.remove_stream(my_stream) + stream_engine.add_stream(my_stream) + assert len(stream_engine._streams) == 1 + await stream_engine.remove_stream(my_stream) + assert len(stream_engine._streams) == 0 @pytest.mark.asyncio -async def test_remove_existing_stream_stops_stream(stream_engine: StreamEngine): +async def test_remove_missing_stream(stream_engine: StreamEngine): topic = "local--hello-kpn" class MyDeserializer: @@ -171,43 +126,15 @@ async def processor(stream: Stream): func=processor, deserializer=deserializer, ) - stream_engine.add_stream(my_stream) - with mock.patch.multiple(Stream, start=mock.DEFAULT, stop=mock.DEFAULT): + with pytest.raises(ValueError): await stream_engine.remove_stream(my_stream) - Stream.stop.assert_awaited() @pytest.mark.asyncio -async def test_add_streams(stream_engine: StreamEngine): +async def test_remove_existing_stream_stops_stream(stream_engine: StreamEngine): topic = "local--hello-kpn" - @stream_engine.stream(topic, name="my-stream") - async def stream(_): - pass - - stream_instance = stream_engine.get_stream("my-stream") - assert stream_instance == stream - assert stream_instance.topics == [topic] - - -@pytest.mark.asyncio -async def test_add_stream_multiple_topics(stream_engine: StreamEngine): - topics = ["local--hello-kpn", "local--hello-kpn-2"] - - @stream_engine.stream(topics, name="my-stream") - async def stream(_): - pass - - stream_instance = stream_engine.get_stream("my-stream") - assert stream_instance == stream - assert stream_instance.topics == topics - - -@pytest.mark.asyncio -async def test_add_stream_as_instance(stream_engine: StreamEngine): - topics = ["local--hello-kpn", "local--hello-kpn-2"] - class MyDeserializer: ... @@ -217,42 +144,20 @@ async def processor(stream: Stream): pass my_stream = Stream( - topics, + topic, name="my-stream", func=processor, deserializer=deserializer, ) - - assert not stream_engine.get_stream("my-stream") - stream_engine.add_stream(my_stream) - stream_instance = stream_engine.get_stream("my-stream") - assert stream_instance == my_stream - assert stream_instance.topics == topics - assert stream_instance.deserializer == deserializer - - # can not add a stream with the same name - with pytest.raises(DuplicateStreamException): - stream_engine.add_stream(Stream("a-topic", name="my-stream", func=processor)) - -@pytest.mark.asyncio -async def test_add_existing_streams(stream_engine: StreamEngine): - topic = "local--hello-kpn" - - @stream_engine.stream(topic, name="my-stream") - async def stream(_): - pass - - with pytest.raises(DuplicateStreamException): - - @stream_engine.stream(topic, name="my-stream") - async def my_stream(_): - pass + with mock.patch.multiple(Stream, start=mock.DEFAULT, stop=mock.DEFAULT): + await stream_engine.remove_stream(my_stream) + Stream.stop.assert_awaited() @pytest.mark.asyncio -async def test_start_stop_streaming(stream_engine: StreamEngine): +async def test_start_stop_stream_engine(stream_engine: StreamEngine): topic = "local--hello-kpn" @stream_engine.stream(topic) @@ -305,35 +210,6 @@ async def stream(cr: ConsumerRecord): save_to_db.assert_awaited_once_with(value) -@pytest.mark.asyncio -async def test_no_recreate_consumer_on_re_start_stream( - stream_engine: StreamEngine, consumer_record_factory -): - topic_name = "local--kstreams" - stream_name = "my-stream" - - async def getone(_): - return consumer_record_factory() - - with mock.patch.multiple( - Consumer, - start=mock.DEFAULT, - getone=getone, - ): - - @stream_engine.stream(topic_name, name=stream_name) - async def stream(my_stream): - async for cr in my_stream: - assert cr - break - - await stream.start() - consumer = stream.consumer - await stream.stop() - await stream.start() - assert consumer is stream.consumer - - @pytest.mark.asyncio async def test_engine_not_started(stream_engine: StreamEngine): topic = "local--hello-kpn" @@ -344,92 +220,3 @@ async def test_engine_not_started(stream_engine: StreamEngine): with pytest.raises(EngineNotStartedException): await stream_engine.send(topic, value=b"1") - - -@pytest.mark.asyncio -async def test_add_stream_custom_conf(stream_engine: StreamEngine): - @stream_engine.stream( - "local--hello-kpn", - name="stream-hello-kpn", - auto_offset_reset="earliest", - enable_auto_commit=False, - ) - async def stream(_): - ... - - stream_instance = stream_engine.get_stream("stream-hello-kpn") - - with mock.patch.multiple(Consumer, start=mock.DEFAULT, stop=mock.DEFAULT): - with mock.patch.multiple(Producer, start=mock.DEFAULT, stop=mock.DEFAULT): - await stream_engine.start_streams() - - # switch the current Task to the one running in background - await asyncio.sleep(0.1) - - assert stream_instance.consumer._auto_offset_reset == "earliest" - assert not stream_instance.consumer._enable_auto_commit - - -@pytest.mark.asyncio -async def test_stream_getmany( - stream_engine: StreamEngine, consumer_record_factory: Callable[..., ConsumerRecord] -): - topic_partition_crs = { - TopicPartition(topic="local--hello-kpn", partition=0): [ - consumer_record_factory(offset=1), - consumer_record_factory(offset=2), - consumer_record_factory(offset=3), - ] - } - - save_to_db = mock.Mock() - - @stream_engine.stream("local--hello-kpn") - async def stream(stream: Stream): - data = await stream.getmany(max_records=3) - save_to_db(data) - - async def getmany(*args, **kwargs): - return topic_partition_crs - - with mock.patch.multiple(Consumer, start=mock.DEFAULT, getmany=getmany): - await stream_engine.start_streams() - await asyncio.sleep(0.1) - save_to_db.assert_called_once_with(topic_partition_crs) - - -@pytest.mark.asyncio -async def test_stream_decorator(stream_engine: StreamEngine): - topic = "local--hello-kpn" - - @stream(topic) - async def streaming_fn(_): - pass - - stream_engine.add_stream(streaming_fn) - - with mock.patch.multiple(Consumer, start=mock.DEFAULT, stop=mock.DEFAULT): - with mock.patch.multiple(Producer, start=mock.DEFAULT, stop=mock.DEFAULT): - await stream_engine.start() - - # switch the current Task to the one running in background - await asyncio.sleep(0.1) - - Consumer.start.assert_awaited() - stream_engine._producer.start.assert_awaited() - - await stream_engine.stop() - stream_engine._producer.stop.assert_awaited() - Consumer.stop.assert_awaited() - - -@pytest.mark.asyncio -async def test_stream_decorates_properly(stream_engine: StreamEngine): - topic = "local--hello-kpn" - - @stream(topic) - async def streaming_fn(_): - """text from func""" - - assert streaming_fn.__name__ == "streaming_fn" - assert streaming_fn.__doc__ == "text from func" diff --git a/tests/test_streams.py b/tests/test_streams.py new file mode 100644 index 0000000..c8a3627 --- /dev/null +++ b/tests/test_streams.py @@ -0,0 +1,222 @@ +import asyncio +from typing import Callable, Set +from unittest import mock + +import pytest + +from kstreams import ConsumerRecord, TopicPartition +from kstreams.clients import Consumer, Producer +from kstreams.engine import Stream, StreamEngine +from kstreams.streams import stream +from kstreams.structs import TopicPartitionOffset + + +@pytest.mark.asyncio +async def test_stream_custom_conf(stream_engine: StreamEngine): + @stream_engine.stream( + "local--hello-kpn", + name="stream-hello-kpn", + auto_offset_reset="earliest", + enable_auto_commit=False, + ) + async def stream(_): + ... + + stream_instance = stream_engine.get_stream("stream-hello-kpn") + + with mock.patch.multiple(Consumer, start=mock.DEFAULT, stop=mock.DEFAULT): + with mock.patch.multiple(Producer, start=mock.DEFAULT, stop=mock.DEFAULT): + await stream_engine.start_streams() + + # switch the current Task to the one running in background + await asyncio.sleep(0.1) + + assert stream_instance.consumer._auto_offset_reset == "earliest" + assert not stream_instance.consumer._enable_auto_commit + + +@pytest.mark.asyncio +async def test_stream_getmany( + stream_engine: StreamEngine, consumer_record_factory: Callable[..., ConsumerRecord] +): + topic_partition_crs = { + TopicPartition(topic="local--hello-kpn", partition=0): [ + consumer_record_factory(offset=1), + consumer_record_factory(offset=2), + consumer_record_factory(offset=3), + ] + } + + save_to_db = mock.Mock() + + @stream_engine.stream("local--hello-kpn") + async def stream(stream: Stream): + data = await stream.getmany(max_records=3) + save_to_db(data) + + async def getmany(*args, **kwargs): + return topic_partition_crs + + with mock.patch.multiple(Consumer, start=mock.DEFAULT, getmany=getmany): + await stream_engine.start_streams() + await asyncio.sleep(0.1) + save_to_db.assert_called_once_with(topic_partition_crs) + + +@pytest.mark.asyncio +async def test_stream_decorator(stream_engine: StreamEngine): + topic = "local--hello-kpn" + + @stream(topic) + async def streaming_fn(_): + pass + + stream_engine.add_stream(streaming_fn) + + with mock.patch.multiple(Consumer, start=mock.DEFAULT, stop=mock.DEFAULT): + with mock.patch.multiple(Producer, start=mock.DEFAULT, stop=mock.DEFAULT): + await stream_engine.start() + + # switch the current Task to the one running in background + await asyncio.sleep(0.1) + + Consumer.start.assert_awaited() + stream_engine._producer.start.assert_awaited() + + await stream_engine.stop() + stream_engine._producer.stop.assert_awaited() + Consumer.stop.assert_awaited() + + +@pytest.mark.asyncio +async def test_stream_decorates_properly(stream_engine: StreamEngine): + topic = "local--hello-kpn" + + @stream(topic) + async def streaming_fn(_): + """text from func""" + + assert streaming_fn.__name__ == "streaming_fn" + assert streaming_fn.__doc__ == "text from func" + + +@pytest.mark.asyncio +async def test_no_recreate_consumer_on_re_start_stream( + stream_engine: StreamEngine, consumer_record_factory +): + topic_name = "local--kstreams" + stream_name = "my-stream" + + async def getone(_): + return consumer_record_factory() + + with mock.patch.multiple( + Consumer, + start=mock.DEFAULT, + getone=getone, + ): + + @stream_engine.stream(topic_name, name=stream_name) + async def stream(my_stream): + async for cr in my_stream: + assert cr + break + + await stream.start() + consumer = stream.consumer + await stream.stop() + await stream.start() + assert consumer is stream.consumer + + +@pytest.mark.asyncio +async def test_seek_to_initial_offsets_normal( + stream_engine: StreamEngine, consumer_record_factory +): + assignments: Set[TopicPartition] = set() + partition = 100 + offset = 10 + topic_name = "example_topic" + value = b"Hello world" + assignments.add(TopicPartition(topic=topic_name, partition=partition)) + seek_mock = mock.Mock() + + async def getone(_): + return consumer_record_factory(value=value) + + with mock.patch.multiple( + Consumer, + assignment=lambda _: assignments, + seek=seek_mock, + start=mock.DEFAULT, + getone=getone, + ): + + @stream_engine.stream( + topic_name, + initial_offsets=[ + TopicPartitionOffset( + topic=topic_name, partition=partition, offset=offset + ) + ], + ) + async def stream(my_stream): + async for cr in my_stream: + assert cr.value == value + break + + await stream.start() + # simulate a partitions assigned rebalance + await stream.rebalance_listener.on_partitions_assigned(assigned=assignments) + + seek_mock.assert_called_once_with( + partition=TopicPartition(topic=topic_name, partition=partition), + offset=offset, + ) + + +@pytest.mark.asyncio +async def test_seek_to_initial_offsets_ignores_wrong_input( + stream_engine: StreamEngine, consumer_record_factory +): + offset = 100 + partition = 100 + topic_name = "example_topic" + wrong_topic = "different_topic" + value = b"Hello world" + wrong_partition = 1 + assignments: Set[TopicPartition] = set() + assignments.add(TopicPartition(topic=topic_name, partition=partition)) + seek_mock = mock.Mock() + + async def getone(_): + return consumer_record_factory(value=value) + + with mock.patch.multiple( + Consumer, + assignment=lambda _: assignments, + seek=seek_mock, + start=mock.DEFAULT, + getone=getone, + ): + + @stream_engine.stream( + topic_name, + initial_offsets=[ + TopicPartitionOffset( + topic=wrong_topic, partition=partition, offset=offset + ), + TopicPartitionOffset( + topic=topic_name, partition=wrong_partition, offset=offset + ), + ], + ) + async def stream(my_stream): + async for cr in my_stream: + assert cr.value == value + break + + await stream.start() + # simulate a partitions assigned rebalance + await stream.rebalance_listener.on_partitions_assigned(assigned=assignments) + seek_mock.assert_not_called()