From b5db8a0d28bd486c987a1f38599831432f4aa77e Mon Sep 17 00:00:00 2001 From: Daniel Gellert Date: Mon, 14 Aug 2023 18:26:43 +0200 Subject: [PATCH] fix: ensure aiokafka commit is called with kafka.structs.TopicPartition (#539) (#541) --- faust/transport/drivers/aiokafka.py | 24 +++++++++++++++---- tests/unit/transport/drivers/test_aiokafka.py | 7 ++++++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index 032e79f8e..4a0c9ebb2 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -711,13 +711,15 @@ async def commit(self, offsets: Mapping[TP, int]) -> bool: async def _commit(self, offsets: Mapping[TP, int]) -> bool: consumer = self._ensure_consumer() now = monotonic() + commitable_offsets = { + tp: offset for tp, offset in offsets.items() if tp in self.assignment() + } try: aiokafka_offsets = { - tp: OffsetAndMetadata(offset, "") - for tp, offset in offsets.items() - if tp in self.assignment() + ensure_aiokafka_TP(tp): OffsetAndMetadata(offset, "") + for tp, offset in commitable_offsets.items() } - self.tp_last_committed_at.update({tp: now for tp in aiokafka_offsets}) + self.tp_last_committed_at.update({tp: now for tp in commitable_offsets}) await consumer.commit(aiokafka_offsets) except CommitFailedError as exc: if "already rebalanced" in str(exc): @@ -1621,3 +1623,17 @@ def credentials_to_aiokafka_auth( } else: return {"security_protocol": "PLAINTEXT"} + + +def ensure_aiokafka_TP(tp: TP) -> _TopicPartition: + """Convert Faust ``TP`` to aiokafka ``TopicPartition``.""" + return ( + tp + if isinstance(tp, _TopicPartition) + else _TopicPartition(tp.topic, tp.partition) + ) + + +def ensure_aiokafka_TPset(tps: Iterable[TP]) -> Set[_TopicPartition]: + """Convert set of Faust ``TP`` to aiokafka ``TopicPartition``.""" + return {ensure_aiokafka_TP(tp) for tp in tps} diff --git a/tests/unit/transport/drivers/test_aiokafka.py b/tests/unit/transport/drivers/test_aiokafka.py index 76f28c2df..1130f2cb8 100644 --- a/tests/unit/transport/drivers/test_aiokafka.py +++ b/tests/unit/transport/drivers/test_aiokafka.py @@ -34,6 +34,7 @@ ThreadedProducer, Transport, credentials_to_aiokafka_auth, + ensure_aiokafka_TPset, server_list, ) from faust.types import TP @@ -2038,3 +2039,9 @@ def test_credentials_to_aiokafka(credentials, ssl_context, expected): def test_credentials_to_aiokafka__invalid(): with pytest.raises(ImproperlyConfigured): credentials_to_aiokafka_auth(object()) + + +def test_ensure_aiokafka_TPset(): + actual = ensure_aiokafka_TPset({TP(topic="foo", partition=0)}) + assert actual == {TopicPartition("foo", 0)} + assert all(isinstance(tp, TopicPartition) for tp in actual)