Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feat/prometheus' into feat/prome…
Browse files Browse the repository at this point in the history
…theus
  • Loading branch information
roma-frolov committed Oct 20, 2024
2 parents 1de4f58 + f0e611c commit 7ffaf44
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ search:
# template variables
fastapi_plugin: If you want to use **FastStream** in conjunction with **FastAPI**, perhaps you should use a special [plugin](../fastapi/index.md){.internal-link}
no_hook: However, even if such a hook is not provided, you can do it yourself.
and_not_only_http: And not only HTTP frameworks.
---

# INTEGRATIONS
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import asyncio

from aiogram import Bot, Dispatcher
from aiogram.types import Message

from faststream.nats import NatsBroker

bot = Bot("")
dispatcher = Dispatcher()
broker = NatsBroker()

@broker.subscriber("echo")
async def echo_faststream_handler(data: dict[str, str]) -> None:
await bot.copy_message(**data)


@dispatcher.message()
async def echo_aiogram_handler(event: Message) -> None:
await broker.publish(
subject="echo",
message={
"chat_id": event.chat.id,
"message_id": event.message_id,
"from_chat_id": event.chat.id,
},
)


async def main() -> None:
async with broker:
await broker.start()
await dispatcher.start_polling(bot)

asyncio.run(main())
7 changes: 7 additions & 0 deletions docs/includes/getting_started/integrations/http/1.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,10 @@
```python linenums="1" hl_lines="5 7 10-12 32-36"
{!> docs_src/integrations/http_frameworks_integrations/tornado.py !}
```

{{ and_not_only_http }}

=== "Aiogram"
```python linenums="1" hl_lines="6 10 12-14 30-31"
{!> docs_src/integrations/no_http_frameworks_integrations/aiogram.py !}
```
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.27"
__version__ = "0.5.28"

SERVICE_NAME = f"faststream-{__version__}"
7 changes: 1 addition & 6 deletions faststream/broker/acknowledgement_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import TYPE_CHECKING, Any, Optional, Type, Union
from typing import Counter as CounterType

from faststream.broker.message import AckStatus
from faststream.exceptions import (
AckMessage,
HandlerException,
Expand Down Expand Up @@ -144,11 +143,7 @@ async def __aexit__(
exc_tb: Optional["TracebackType"],
) -> bool:
"""Exit the asynchronous context manager."""
if self.message.committed:
if self.message.committed in (AckStatus.acked, AckStatus.rejected):
self.watcher.remove(self.message.message_id)

elif not exc_type:
if not exc_type:
await self.__ack()

elif isinstance(exc_val, HandlerException):
Expand Down
9 changes: 6 additions & 3 deletions faststream/broker/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,16 @@ class StreamMessage(Generic[MsgType]):
_decoded_body: Optional["DecodedMessage"] = field(default=None, init=False)

async def ack(self) -> None:
self.committed = AckStatus.acked
if not self.committed:
self.committed = AckStatus.acked

async def nack(self) -> None:
self.committed = AckStatus.nacked
if not self.committed:
self.committed = AckStatus.nacked

async def reject(self) -> None:
self.committed = AckStatus.rejected
if not self.committed:
self.committed = AckStatus.rejected

async def decode(self) -> Optional["DecodedMessage"]:
"""Serialize the message by lazy decoder."""
Expand Down
4 changes: 2 additions & 2 deletions faststream/confluent/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def ack(self) -> None:
"""Acknowledge the Kafka message."""
if self.is_manual and not self.committed:
await self.consumer.commit()
await super().ack()
await super().ack()

async def nack(self) -> None:
"""Reject the Kafka message."""
Expand All @@ -81,4 +81,4 @@ async def nack(self) -> None:
partition=raw_message.partition(),
offset=raw_message.offset(),
)
await super().nack()
await super().nack()
4 changes: 2 additions & 2 deletions faststream/kafka/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ async def nack(self) -> None:
partition=topic_partition,
offset=raw_message.offset,
)
await super().nack()
await super().nack()


class KafkaAckableMessage(KafkaMessage):
async def ack(self) -> None:
"""Acknowledge the Kafka message."""
if not self.committed:
await self.consumer.commit()
await super().ack()
await super().ack()
6 changes: 3 additions & 3 deletions faststream/nats/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ async def ack(self) -> None:
# to be compatible with `self.raw_message.ack()`
if not self.raw_message._ackd:
await self.raw_message.ack()
await super().ack()
await super().ack()

async def nack(
self,
delay: Union[int, float, None] = None,
) -> None:
if not self.raw_message._ackd:
await self.raw_message.nak(delay=delay)
await super().nack()
await super().nack()

async def reject(self) -> None:
if not self.raw_message._ackd:
await self.raw_message.term()
await super().reject()
await super().reject()

async def in_progress(self) -> None:
if not self.raw_message._ackd:
Expand Down
2 changes: 1 addition & 1 deletion faststream/redis/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async def ack(
ids = self.raw_message["message_ids"]
channel = self.raw_message["channel"]
await redis.xack(channel, group, *ids) # type: ignore[no-untyped-call]
await super().ack()
await super().ack()

@override
async def nack(
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ types = [
"types-redis",
"types-Pygments",
"types-docutils",
"types-aiofiles",
"confluent-kafka-stubs; python_version >= '3.11'",
]

Expand Down

0 comments on commit 7ffaf44

Please sign in to comment.