Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure aiokafka commit is called with kafka.structs.TopicPartiti… #541

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,13 +711,15 @@ async def commit(self, offsets: Mapping[TP, int]) -> bool:
async def _commit(self, offsets: Mapping[TP, int]) -> bool:
consumer = self._ensure_consumer()
now = monotonic()
commitable_offsets = {
tp: offset for tp, offset in offsets.items() if tp in self.assignment()
}
try:
aiokafka_offsets = {
tp: OffsetAndMetadata(offset, "")
for tp, offset in offsets.items()
if tp in self.assignment()
ensure_aiokafka_TP(tp): OffsetAndMetadata(offset, "")
for tp, offset in commitable_offsets.items()
}
self.tp_last_committed_at.update({tp: now for tp in aiokafka_offsets})
self.tp_last_committed_at.update({tp: now for tp in commitable_offsets})
await consumer.commit(aiokafka_offsets)
except CommitFailedError as exc:
if "already rebalanced" in str(exc):
Expand Down Expand Up @@ -1621,3 +1623,17 @@ def credentials_to_aiokafka_auth(
}
else:
return {"security_protocol": "PLAINTEXT"}


def ensure_aiokafka_TP(tp: TP) -> _TopicPartition:
"""Convert Faust ``TP`` to aiokafka ``TopicPartition``."""
return (
tp
if isinstance(tp, _TopicPartition)
else _TopicPartition(tp.topic, tp.partition)
)


def ensure_aiokafka_TPset(tps: Iterable[TP]) -> Set[_TopicPartition]:
"""Convert set of Faust ``TP`` to aiokafka ``TopicPartition``."""
return {ensure_aiokafka_TP(tp) for tp in tps}
7 changes: 7 additions & 0 deletions tests/unit/transport/drivers/test_aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ThreadedProducer,
Transport,
credentials_to_aiokafka_auth,
ensure_aiokafka_TPset,
server_list,
)
from faust.types import TP
Expand Down Expand Up @@ -2038,3 +2039,9 @@ def test_credentials_to_aiokafka(credentials, ssl_context, expected):
def test_credentials_to_aiokafka__invalid():
with pytest.raises(ImproperlyConfigured):
credentials_to_aiokafka_auth(object())


def test_ensure_aiokafka_TPset():
actual = ensure_aiokafka_TPset({TP(topic="foo", partition=0)})
assert actual == {TopicPartition("foo", 0)}
assert all(isinstance(tp, TopicPartition) for tp in actual)
Loading