Skip to content

Commit

Permalink
fix: ensure aiokafka commit is called with kafka.structs.TopicPartiti…
Browse files Browse the repository at this point in the history
…on (#539)
  • Loading branch information
dada-engineer committed Aug 14, 2023
1 parent 87a80a9 commit 261e61e
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,13 +711,15 @@ async def commit(self, offsets: Mapping[TP, int]) -> bool:
async def _commit(self, offsets: Mapping[TP, int]) -> bool:
consumer = self._ensure_consumer()
now = monotonic()
commitable_offsets = {
tp: offset for tp, offset in offsets.items() if tp in self.assignment()
}
try:
aiokafka_offsets = {
tp: OffsetAndMetadata(offset, "")
for tp, offset in offsets.items()
if tp in self.assignment()
ensure_aiokafka_TP(tp): OffsetAndMetadata(offset, "")
for tp, offset in commitable_offsets.items()
}
self.tp_last_committed_at.update({tp: now for tp in aiokafka_offsets})
self.tp_last_committed_at.update({tp: now for tp in commitable_offsets})
await consumer.commit(aiokafka_offsets)
except CommitFailedError as exc:
if "already rebalanced" in str(exc):
Expand Down Expand Up @@ -1621,3 +1623,17 @@ def credentials_to_aiokafka_auth(
}
else:
return {"security_protocol": "PLAINTEXT"}


def ensure_aiokafka_TP(tp: TP) -> _TopicPartition:
"""Convert Faust ``TP`` to aiokafka ``TopicPartition``."""
return (

Check warning on line 1630 in faust/transport/drivers/aiokafka.py

View check run for this annotation

Codecov / codecov/patch

faust/transport/drivers/aiokafka.py#L1630

Added line #L1630 was not covered by tests
tp
if isinstance(tp, _TopicPartition)
else _TopicPartition(tp.topic, tp.partition)
)


def ensure_aiokafka_TPset(tps: Iterable[TP]) -> Set[_TopicPartition]:
"""Convert set of Faust ``TP`` to aiokafka ``TopicPartition``."""
return {ensure_aiokafka_TP(tp) for tp in tps}

0 comments on commit 261e61e

Please sign in to comment.