Skip to content

Commit

Permalink
lint: polish annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Oct 18, 2024
1 parent 248fd02 commit bec84f7
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 28 deletions.
7 changes: 2 additions & 5 deletions faststream/_internal/publisher/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,12 @@ class ProducerProto(Protocol):
_decoder: "AsyncCallable"

@abstractmethod
async def publish(self, message: "PublishCommand") -> Optional[Any]:
async def publish(self, cmd: "PublishCommand") -> Optional[Any]:
"""Publishes a message asynchronously."""
...

@abstractmethod
async def request(
self,
message: "PublishCommand",
) -> Any:
async def request(self, cmd: "PublishCommand") -> Any:
"""Publishes a message synchronously."""
...

Expand Down
46 changes: 23 additions & 23 deletions faststream/nats/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,39 +44,39 @@ def __init__(
@override
async def publish( # type: ignore[override]
self,
message: "NatsPublishCommand",
cmd: "NatsPublishCommand",
) -> None:
payload, content_type = encode_message(message.body)
payload, content_type = encode_message(cmd.body)

headers_to_send = {
"content-type": content_type or "",
**message.headers_to_publish(),
**cmd.headers_to_publish(),
}

await self._connection.publish(
subject=message.destination,
subject=cmd.destination,
payload=payload,
reply=message.reply_to,
reply=cmd.reply_to,
headers=headers_to_send,
)

@override
async def request( # type: ignore[override]
self,
message: "NatsPublishCommand",
cmd: "NatsPublishCommand",
) -> "Msg":
payload, content_type = encode_message(message.body)
payload, content_type = encode_message(cmd.body)

headers_to_send = {
"content-type": content_type or "",
**message.headers_to_publish(),
**cmd.headers_to_publish(),
}

return await self._connection.request(
subject=message.destination,
subject=cmd.destination,
payload=payload,
headers=headers_to_send,
timeout=message.timeout,
timeout=cmd.timeout,
)


Expand All @@ -102,31 +102,31 @@ def __init__(
@override
async def publish( # type: ignore[override]
self,
message: "NatsPublishCommand",
cmd: "NatsPublishCommand",
) -> Optional[Any]:
payload, content_type = encode_message(message.body)
payload, content_type = encode_message(cmd.body)

headers_to_send = {
"content-type": content_type or "",
**message.headers_to_publish(js=True),
**cmd.headers_to_publish(js=True),
}

await self._connection.publish(
subject=message.destination,
subject=cmd.destination,
payload=payload,
headers=headers_to_send,
stream=message.stream,
timeout=message.timeout,
stream=cmd.stream,
timeout=cmd.timeout,
)

return None

@override
async def request( # type: ignore[override]
self,
message: "NatsPublishCommand",
cmd: "NatsPublishCommand",
) -> "Msg":
payload, content_type = encode_message(message.body)
payload, content_type = encode_message(cmd.body)

reply_to = self._connection._nc.new_inbox()
future: asyncio.Future[Msg] = asyncio.Future()
Expand All @@ -136,16 +136,16 @@ async def request( # type: ignore[override]
headers_to_send = {
"content-type": content_type or "",
"reply_to": reply_to,
**message.headers_to_publish(js=False),
**cmd.headers_to_publish(js=False),
}

with anyio.fail_after(message.timeout):
with anyio.fail_after(cmd.timeout):
await self._connection.publish(
subject=message.destination,
subject=cmd.destination,
payload=payload,
headers=headers_to_send,
stream=message.stream,
timeout=message.timeout,
stream=cmd.stream,
timeout=cmd.timeout,
)

msg = await future
Expand Down

0 comments on commit bec84f7

Please sign in to comment.