Skip to content

Commit

Permalink
feat: add explicit message source enum (#1866)
Browse files Browse the repository at this point in the history
* feat: add explicit message source enum

* docs: generate API References

* docs: generate API References

---------

Co-authored-by: Lancetnik <[email protected]>
  • Loading branch information
Lancetnik and Lancetnik authored Oct 21, 2024
1 parent 4c7895c commit e165f03
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 5 deletions.
1 change: 1 addition & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ search:
- [StreamRouter](api/faststream/broker/fastapi/router/StreamRouter.md)
- message
- [AckStatus](api/faststream/broker/message/AckStatus.md)
- [SourceType](api/faststream/broker/message/SourceType.md)
- [StreamMessage](api/faststream/broker/message/StreamMessage.md)
- [decode_message](api/faststream/broker/message/decode_message.md)
- [encode_message](api/faststream/broker/message/encode_message.md)
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/broker/message/SourceType.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.message.SourceType
2 changes: 2 additions & 0 deletions faststream/broker/core/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from faststream._compat import is_test_env
from faststream.broker.core.logging import LoggingBroker
from faststream.broker.message import SourceType
from faststream.broker.middlewares.logging import CriticalLogMiddleware
from faststream.broker.proto import SetupAble
from faststream.broker.subscriber.proto import SubscriberProto
Expand Down Expand Up @@ -376,6 +377,7 @@ async def request(

parsed_msg: StreamMessage[Any] = await producer._parser(published_msg)
parsed_msg._decoded_body = await producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

@abstractmethod
Expand Down
9 changes: 9 additions & 0 deletions faststream/broker/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ class AckStatus(str, Enum):
rejected = "rejected"


class SourceType(str, Enum):
Consume = "Consume"
"""Message consumed by basic subscriber flow."""

Response = "Response"
"""RPC response consumed."""


def gen_cor_id() -> str:
"""Generate random string to use as ID."""
return str(uuid4())
Expand All @@ -60,6 +68,7 @@ class StreamMessage(Generic[MsgType]):

processed: bool = field(default=False, init=False)
committed: Optional[AckStatus] = field(default=None, init=False)
_source_type: SourceType = field(default=SourceType.Consume)
_decoded_body: Optional["DecodedMessage"] = field(default=None, init=False)

async def ack(self) -> None:
Expand Down
3 changes: 2 additions & 1 deletion faststream/confluent/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from confluent_kafka import Message
from typing_extensions import override

from faststream.broker.message import gen_cor_id
from faststream.broker.message import SourceType, gen_cor_id
from faststream.broker.publisher.usecase import PublisherUsecase
from faststream.broker.types import MsgType
from faststream.exceptions import NOT_CONNECTED_YET
Expand Down Expand Up @@ -124,6 +124,7 @@ async def request(

parsed_msg = await self._producer._parser(published_msg)
parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

raise AssertionError("unreachable")
Expand Down
3 changes: 2 additions & 1 deletion faststream/kafka/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from aiokafka import ConsumerRecord
from typing_extensions import Annotated, Doc, override

from faststream.broker.message import gen_cor_id
from faststream.broker.message import SourceType, gen_cor_id
from faststream.broker.publisher.usecase import PublisherUsecase
from faststream.broker.types import MsgType
from faststream.exceptions import NOT_CONNECTED_YET
Expand Down Expand Up @@ -177,6 +177,7 @@ async def request(

parsed_msg = await self._producer._parser(published_msg)
parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

raise AssertionError("unreachable")
Expand Down
3 changes: 2 additions & 1 deletion faststream/nats/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from nats.aio.msg import Msg
from typing_extensions import Annotated, Doc, override

from faststream.broker.message import gen_cor_id
from faststream.broker.message import SourceType, gen_cor_id
from faststream.broker.publisher.usecase import PublisherUsecase
from faststream.exceptions import NOT_CONNECTED_YET
from faststream.utils.functions import return_input
Expand Down Expand Up @@ -212,6 +212,7 @@ async def request(

parsed_msg = await self._producer._parser(published_msg)
parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

raise AssertionError("unreachable")
Expand Down
3 changes: 2 additions & 1 deletion faststream/rabbit/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from aio_pika import IncomingMessage
from typing_extensions import Annotated, Doc, TypedDict, Unpack, deprecated, override

from faststream.broker.message import gen_cor_id
from faststream.broker.message import SourceType, gen_cor_id
from faststream.broker.publisher.usecase import PublisherUsecase
from faststream.exceptions import NOT_CONNECTED_YET
from faststream.rabbit.schemas import BaseRMQInformation, RabbitQueue
Expand Down Expand Up @@ -373,6 +373,7 @@ async def request(

parsed_msg = await self._producer._parser(published_msg)
parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

raise AssertionError("unreachable")
Expand Down
5 changes: 4 additions & 1 deletion faststream/redis/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from typing_extensions import Annotated, Doc, deprecated, override

from faststream.broker.message import gen_cor_id
from faststream.broker.message import SourceType, gen_cor_id
from faststream.broker.publisher.usecase import PublisherUsecase
from faststream.exceptions import NOT_CONNECTED_YET
from faststream.redis.message import UnifyRedisDict
Expand Down Expand Up @@ -268,6 +268,7 @@ async def request(

parsed_msg = await self._producer._parser(published_msg)
parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

raise AssertionError("unreachable")
Expand Down Expand Up @@ -481,6 +482,7 @@ async def request(

parsed_msg = await self._producer._parser(published_msg)
parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

raise AssertionError("unreachable")
Expand Down Expand Up @@ -762,6 +764,7 @@ async def request(

parsed_msg = await self._producer._parser(published_msg)
parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

raise AssertionError("unreachable")

0 comments on commit e165f03

Please sign in to comment.