Skip to content

Commit

Permalink
fix: ensure aiokafka commit is called with kafka.structs.TopicPartiti…
Browse files Browse the repository at this point in the history
…on (#539) (#541)
  • Loading branch information
dada-engineer committed Aug 14, 2023
1 parent 87a80a9 commit b5db8a0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
24 changes: 20 additions & 4 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,13 +711,15 @@ async def commit(self, offsets: Mapping[TP, int]) -> bool:
async def _commit(self, offsets: Mapping[TP, int]) -> bool:
consumer = self._ensure_consumer()
now = monotonic()
commitable_offsets = {
tp: offset for tp, offset in offsets.items() if tp in self.assignment()
}
try:
aiokafka_offsets = {
tp: OffsetAndMetadata(offset, "")
for tp, offset in offsets.items()
if tp in self.assignment()
ensure_aiokafka_TP(tp): OffsetAndMetadata(offset, "")
for tp, offset in commitable_offsets.items()
}
self.tp_last_committed_at.update({tp: now for tp in aiokafka_offsets})
self.tp_last_committed_at.update({tp: now for tp in commitable_offsets})
await consumer.commit(aiokafka_offsets)
except CommitFailedError as exc:
if "already rebalanced" in str(exc):
Expand Down Expand Up @@ -1621,3 +1623,17 @@ def credentials_to_aiokafka_auth(
}
else:
return {"security_protocol": "PLAINTEXT"}


def ensure_aiokafka_TP(tp: TP) -> _TopicPartition:
"""Convert Faust ``TP`` to aiokafka ``TopicPartition``."""
return (
tp
if isinstance(tp, _TopicPartition)
else _TopicPartition(tp.topic, tp.partition)
)


def ensure_aiokafka_TPset(tps: Iterable[TP]) -> Set[_TopicPartition]:
"""Convert set of Faust ``TP`` to aiokafka ``TopicPartition``."""
return {ensure_aiokafka_TP(tp) for tp in tps}
7 changes: 7 additions & 0 deletions tests/unit/transport/drivers/test_aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ThreadedProducer,
Transport,
credentials_to_aiokafka_auth,
ensure_aiokafka_TPset,
server_list,
)
from faust.types import TP
Expand Down Expand Up @@ -2038,3 +2039,9 @@ def test_credentials_to_aiokafka(credentials, ssl_context, expected):
def test_credentials_to_aiokafka__invalid():
with pytest.raises(ImproperlyConfigured):
credentials_to_aiokafka_auth(object())


def test_ensure_aiokafka_TPset():
actual = ensure_aiokafka_TPset({TP(topic="foo", partition=0)})
assert actual == {TopicPartition("foo", 0)}
assert all(isinstance(tp, TopicPartition) for tp in actual)

0 comments on commit b5db8a0

Please sign in to comment.