Skip to content

Commit

Permalink
fix: revert acknowledgement changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Oct 19, 2024
1 parent a9a0156 commit 855733a
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 16 deletions.
6 changes: 1 addition & 5 deletions faststream/broker/acknowledgement_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,7 @@ async def __aexit__(
exc_tb: Optional["TracebackType"],
) -> bool:
"""Exit the asynchronous context manager."""
if self.message.committed:
if self.message.committed in (AckStatus.acked, AckStatus.rejected):
self.watcher.remove(self.message.message_id)

elif not exc_type:
if not exc_type:
await self.__ack()

elif isinstance(exc_val, HandlerException):
Expand Down
9 changes: 6 additions & 3 deletions faststream/broker/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,16 @@ class StreamMessage(Generic[MsgType]):
_decoded_body: Optional["DecodedMessage"] = field(default=None, init=False)

async def ack(self) -> None:
self.committed = AckStatus.acked
if not self.committed:
self.committed = AckStatus.acked

async def nack(self) -> None:
self.committed = AckStatus.nacked
if not self.committed:
self.committed = AckStatus.nacked

async def reject(self) -> None:
self.committed = AckStatus.rejected
if not self.committed:
self.committed = AckStatus.rejected

async def decode(self) -> Optional["DecodedMessage"]:
"""Serialize the message by lazy decoder."""
Expand Down
4 changes: 2 additions & 2 deletions faststream/confluent/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def ack(self) -> None:
"""Acknowledge the Kafka message."""
if self.is_manual and not self.committed:
await self.consumer.commit()
await super().ack()
await super().ack()

async def nack(self) -> None:
"""Reject the Kafka message."""
Expand All @@ -81,4 +81,4 @@ async def nack(self) -> None:
partition=raw_message.partition(),
offset=raw_message.offset(),
)
await super().nack()
await super().nack()
4 changes: 2 additions & 2 deletions faststream/kafka/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ async def nack(self) -> None:
partition=topic_partition,
offset=raw_message.offset,
)
await super().nack()
await super().nack()


class KafkaAckableMessage(KafkaMessage):
async def ack(self) -> None:
"""Acknowledge the Kafka message."""
if not self.committed:
await self.consumer.commit()
await super().ack()
await super().ack()
6 changes: 3 additions & 3 deletions faststream/nats/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ async def ack(self) -> None:
# to be compatible with `self.raw_message.ack()`
if not self.raw_message._ackd:
await self.raw_message.ack()
await super().ack()
await super().ack()

async def nack(
self,
delay: Union[int, float, None] = None,
) -> None:
if not self.raw_message._ackd:
await self.raw_message.nak(delay=delay)
await super().nack()
await super().nack()

async def reject(self) -> None:
if not self.raw_message._ackd:
await self.raw_message.term()
await super().reject()
await super().reject()

async def in_progress(self) -> None:
if not self.raw_message._ackd:
Expand Down
2 changes: 1 addition & 1 deletion faststream/redis/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async def ack(
ids = self.raw_message["message_ids"]
channel = self.raw_message["channel"]
await redis.xack(channel, group, *ids) # type: ignore[no-untyped-call]
await super().ack()
await super().ack()

@override
async def nack(
Expand Down

0 comments on commit 855733a

Please sign in to comment.