From e9580ba237b7dab9572323017a17710d99934411 Mon Sep 17 00:00:00 2001 From: Ivan Kirpichnikov Date: Fri, 18 Oct 2024 23:18:44 +0300 Subject: [PATCH 1/5] add aiogram example (#1858) * add aiogram example * add highlighting lines * add types-aiofiles in types optional dependencies * run pre-commit --- .../integrations/frameworks/index.md | 1 + docs/docs/en/public_api | 2 +- .../__init__.py | 0 .../aiogram.py | 34 +++++++++++++++++++ .../getting_started/integrations/http/1.md | 7 ++++ pyproject.toml | 1 + 6 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 docs/docs_src/integrations/no_http_frameworks_integrations/__init__.py create mode 100644 docs/docs_src/integrations/no_http_frameworks_integrations/aiogram.py diff --git a/docs/docs/en/getting-started/integrations/frameworks/index.md b/docs/docs/en/getting-started/integrations/frameworks/index.md index fcb09ce7f2..d6fe094465 100644 --- a/docs/docs/en/getting-started/integrations/frameworks/index.md +++ b/docs/docs/en/getting-started/integrations/frameworks/index.md @@ -9,6 +9,7 @@ search: # template variables fastapi_plugin: If you want to use **FastStream** in conjunction with **FastAPI**, perhaps you should use a special [plugin](../fastapi/index.md){.internal-link} no_hook: However, even if such a hook is not provided, you can do it yourself. +and_not_only_http: And not only HTTP frameworks. --- # INTEGRATIONS diff --git a/docs/docs/en/public_api b/docs/docs/en/public_api index b14a93fe9e..2c417d1ba5 120000 --- a/docs/docs/en/public_api +++ b/docs/docs/en/public_api @@ -1 +1 @@ -./api/ \ No newline at end of file +./api/ diff --git a/docs/docs_src/integrations/no_http_frameworks_integrations/__init__.py b/docs/docs_src/integrations/no_http_frameworks_integrations/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/docs_src/integrations/no_http_frameworks_integrations/aiogram.py b/docs/docs_src/integrations/no_http_frameworks_integrations/aiogram.py new file mode 100644 index 0000000000..94d8cc7aae --- /dev/null +++ b/docs/docs_src/integrations/no_http_frameworks_integrations/aiogram.py @@ -0,0 +1,34 @@ +import asyncio + +from aiogram import Bot, Dispatcher +from aiogram.types import Message + +from faststream.nats import NatsBroker + +bot = Bot("") +dispatcher = Dispatcher() +broker = NatsBroker() + +@broker.subscriber("echo") +async def echo_faststream_handler(data: dict[str, str]) -> None: + await bot.copy_message(**data) + + +@dispatcher.message() +async def echo_aiogram_handler(event: Message) -> None: + await broker.publish( + subject="echo", + message={ + "chat_id": event.chat.id, + "message_id": event.message_id, + "from_chat_id": event.chat.id, + }, + ) + + +async def main() -> None: + async with broker: + await broker.start() + await dispatcher.start_polling(bot) + +asyncio.run(main()) diff --git a/docs/includes/getting_started/integrations/http/1.md b/docs/includes/getting_started/integrations/http/1.md index 6c6d58e0c0..d124345ce4 100644 --- a/docs/includes/getting_started/integrations/http/1.md +++ b/docs/includes/getting_started/integrations/http/1.md @@ -42,3 +42,10 @@ ```python linenums="1" hl_lines="5 7 10-12 32-36" {!> docs_src/integrations/http_frameworks_integrations/tornado.py !} ``` + +{{ and_not_only_http }} + +=== "Aiogram" + ```python linenums="1" hl_lines="6 10 12-14 30-31" + {!> docs_src/integrations/no_http_frameworks_integrations/aiogram.py !} + ``` diff --git a/pyproject.toml b/pyproject.toml index 10d7a66092..350d53ed3e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -113,6 +113,7 @@ types = [ "types-redis", "types-Pygments", "types-docutils", + "types-aiofiles", "confluent-kafka-stubs; python_version >= '3.11'", ] From 855733a62deab1bb8ab3b9e438fd6d7f7ac10ff8 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Sat, 19 Oct 2024 09:43:13 +0300 Subject: [PATCH 2/5] fix: revert acknowledgement changes --- faststream/broker/acknowledgement_watcher.py | 6 +----- faststream/broker/message.py | 9 ++++++--- faststream/confluent/message.py | 4 ++-- faststream/kafka/message.py | 4 ++-- faststream/nats/message.py | 6 +++--- faststream/redis/message.py | 2 +- 6 files changed, 15 insertions(+), 16 deletions(-) diff --git a/faststream/broker/acknowledgement_watcher.py b/faststream/broker/acknowledgement_watcher.py index 58a02e4101..4558daf01c 100644 --- a/faststream/broker/acknowledgement_watcher.py +++ b/faststream/broker/acknowledgement_watcher.py @@ -144,11 +144,7 @@ async def __aexit__( exc_tb: Optional["TracebackType"], ) -> bool: """Exit the asynchronous context manager.""" - if self.message.committed: - if self.message.committed in (AckStatus.acked, AckStatus.rejected): - self.watcher.remove(self.message.message_id) - - elif not exc_type: + if not exc_type: await self.__ack() elif isinstance(exc_val, HandlerException): diff --git a/faststream/broker/message.py b/faststream/broker/message.py index 82593e7cdc..e06e912593 100644 --- a/faststream/broker/message.py +++ b/faststream/broker/message.py @@ -63,13 +63,16 @@ class StreamMessage(Generic[MsgType]): _decoded_body: Optional["DecodedMessage"] = field(default=None, init=False) async def ack(self) -> None: - self.committed = AckStatus.acked + if not self.committed: + self.committed = AckStatus.acked async def nack(self) -> None: - self.committed = AckStatus.nacked + if not self.committed: + self.committed = AckStatus.nacked async def reject(self) -> None: - self.committed = AckStatus.rejected + if not self.committed: + self.committed = AckStatus.rejected async def decode(self) -> Optional["DecodedMessage"]: """Serialize the message by lazy decoder.""" diff --git a/faststream/confluent/message.py b/faststream/confluent/message.py index 14fe05ae7b..83ee0e814b 100644 --- a/faststream/confluent/message.py +++ b/faststream/confluent/message.py @@ -66,7 +66,7 @@ async def ack(self) -> None: """Acknowledge the Kafka message.""" if self.is_manual and not self.committed: await self.consumer.commit() - await super().ack() + await super().ack() async def nack(self) -> None: """Reject the Kafka message.""" @@ -81,4 +81,4 @@ async def nack(self) -> None: partition=raw_message.partition(), offset=raw_message.offset(), ) - await super().nack() + await super().nack() diff --git a/faststream/kafka/message.py b/faststream/kafka/message.py index d83a57bf6a..bde7669787 100644 --- a/faststream/kafka/message.py +++ b/faststream/kafka/message.py @@ -77,7 +77,7 @@ async def nack(self) -> None: partition=topic_partition, offset=raw_message.offset, ) - await super().nack() + await super().nack() class KafkaAckableMessage(KafkaMessage): @@ -85,4 +85,4 @@ async def ack(self) -> None: """Acknowledge the Kafka message.""" if not self.committed: await self.consumer.commit() - await super().ack() + await super().ack() diff --git a/faststream/nats/message.py b/faststream/nats/message.py index ee54ef2caa..0f104a3310 100644 --- a/faststream/nats/message.py +++ b/faststream/nats/message.py @@ -15,7 +15,7 @@ async def ack(self) -> None: # to be compatible with `self.raw_message.ack()` if not self.raw_message._ackd: await self.raw_message.ack() - await super().ack() + await super().ack() async def nack( self, @@ -23,12 +23,12 @@ async def nack( ) -> None: if not self.raw_message._ackd: await self.raw_message.nak(delay=delay) - await super().nack() + await super().nack() async def reject(self) -> None: if not self.raw_message._ackd: await self.raw_message.term() - await super().reject() + await super().reject() async def in_progress(self) -> None: if not self.raw_message._ackd: diff --git a/faststream/redis/message.py b/faststream/redis/message.py index 8bce4005c8..86cc9b3d96 100644 --- a/faststream/redis/message.py +++ b/faststream/redis/message.py @@ -128,7 +128,7 @@ async def ack( ids = self.raw_message["message_ids"] channel = self.raw_message["channel"] await redis.xack(channel, group, *ids) # type: ignore[no-untyped-call] - await super().ack() + await super().ack() @override async def nack( From 2d10097ba79cb5f6917fa49b4bc8b82a1e62f75b Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Sat, 19 Oct 2024 09:44:47 +0300 Subject: [PATCH 3/5] lint: fix precommit --- faststream/broker/acknowledgement_watcher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/faststream/broker/acknowledgement_watcher.py b/faststream/broker/acknowledgement_watcher.py index 4558daf01c..4084274095 100644 --- a/faststream/broker/acknowledgement_watcher.py +++ b/faststream/broker/acknowledgement_watcher.py @@ -4,7 +4,6 @@ from typing import TYPE_CHECKING, Any, Optional, Type, Union from typing import Counter as CounterType -from faststream.broker.message import AckStatus from faststream.exceptions import ( AckMessage, HandlerException, From 328405b8d4652c5396d563ac6ab0995fc0357e6d Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Sat, 19 Oct 2024 09:53:41 +0300 Subject: [PATCH 4/5] chore: bump version --- faststream/__about__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faststream/__about__.py b/faststream/__about__.py index a829efe0e2..f350fa0af9 100644 --- a/faststream/__about__.py +++ b/faststream/__about__.py @@ -1,5 +1,5 @@ """Simple and fast framework to create message brokers based microservices.""" -__version__ = "0.5.27" +__version__ = "0.5.28" SERVICE_NAME = f"faststream-{__version__}" From 5812e298f1cb83ae1ecfd614fe8a68e64d02cc74 Mon Sep 17 00:00:00 2001 From: Pastukhov Nikita Date: Sat, 19 Oct 2024 14:08:59 +0300 Subject: [PATCH 5/5] docs: fix public api directory (#1861) --- docs/docs/en/public_api | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/docs/en/public_api b/docs/docs/en/public_api index 2c417d1ba5..b14a93fe9e 120000 --- a/docs/docs/en/public_api +++ b/docs/docs/en/public_api @@ -1 +1 @@ -./api/ +./api/ \ No newline at end of file