Skip to content

Commit

Permalink
Merge branch 'main' into feat-ping-all-brokers
Browse files Browse the repository at this point in the history
  • Loading branch information
kumaranvpl authored Jul 17, 2024
2 parents 415a3ac + 5e37604 commit 45a1420
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 18 deletions.
8 changes: 6 additions & 2 deletions faststream/asyncapi/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def get_app_schema(app: Union["FastStream", "StreamRouter[Any]"]) -> Schema:
payloads,
messages,
)

schema = Schema(
info=Info(
title=app.title,
Expand Down Expand Up @@ -146,9 +145,13 @@ def _resolve_msg_payloads(
payloads: Dict[str, Any],
messages: Dict[str, Any],
) -> Reference:
one_of_list: List[Reference] = []
"""Replace message payload by reference and normalize payloads.
Payloads and messages are editable dicts to store schemas for reference in AsyncAPI.
"""
one_of_list: List[Reference] = []
m.payload = _move_pydantic_refs(m.payload, DEF_KEY)

if DEF_KEY in m.payload:
payloads.update(m.payload.pop(DEF_KEY))

Expand Down Expand Up @@ -186,6 +189,7 @@ def _move_pydantic_refs(
original: Any,
key: str,
) -> Any:
"""Remove pydantic references and replacem them by real schemas."""
if not isinstance(original, Dict):
return original

Expand Down
14 changes: 13 additions & 1 deletion faststream/broker/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,9 @@ def get_log_context(
@property
def call_name(self) -> str:
"""Returns the name of the handler call."""
# TODO: default call_name
if not self.calls:
return "Subscriber"

return to_camelcase(self.calls[0].call_name)

def get_description(self) -> Optional[str]:
Expand All @@ -433,4 +435,14 @@ def get_payloads(self) -> List[Tuple["AnyDict", str]]:

payloads.append((body, to_camelcase(h.call_name)))

if not self.calls:
payloads.append(
(
{
"title": f"{self.title_ or self.call_name}:Message:Payload",
},
to_camelcase(self.call_name),
)
)

return payloads
10 changes: 5 additions & 5 deletions faststream/confluent/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ def parse_security(security: Optional[BaseSecurity]) -> "AnyDict":

if security is None:
return {}
elif type(security) == BaseSecurity:
return _parse_base_security(security)
elif type(security) == SASLPlaintext:
elif isinstance(security, SASLPlaintext):
return _parse_sasl_plaintext(security)
elif type(security) == SASLScram256:
elif isinstance(security, SASLScram256):
return _parse_sasl_scram256(security)
elif type(security) == SASLScram512:
elif isinstance(security, SASLScram512):
return _parse_sasl_scram512(security)
elif isinstance(security, BaseSecurity):
return _parse_base_security(security)
else:
raise NotImplementedError(f"KafkaBroker does not support `{type(security)}`.")

Expand Down
1 change: 0 additions & 1 deletion faststream/confluent/subscriber/asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def get_schema(self) -> Dict[str, Channel]:
channels = {}

payloads = self.get_payloads()

for t in self.topics:
handler_name = self.title_ or f"{t}:{self.call_name}"

Expand Down
10 changes: 5 additions & 5 deletions faststream/kafka/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
def parse_security(security: Optional[BaseSecurity]) -> "AnyDict":
if security is None:
return {}
elif type(security) == BaseSecurity:
return _parse_base_security(security)
elif type(security) == SASLPlaintext:
elif isinstance(security, SASLPlaintext):
return _parse_sasl_plaintext(security)
elif type(security) == SASLScram256:
elif isinstance(security, SASLScram256):
return _parse_sasl_scram256(security)
elif type(security) == SASLScram512:
elif isinstance(security, SASLScram512):
return _parse_sasl_scram512(security)
elif isinstance(security, BaseSecurity):
return _parse_base_security(security)
else:
raise NotImplementedError(f"KafkaBroker does not support `{type(security)}`.")

Expand Down
3 changes: 3 additions & 0 deletions faststream/nats/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ async def _create_subscription( # type: ignore[override]
"""Create NATS subscription and start consume task."""
self.subscription = await connection.pull_subscribe(
subject=self.clear_subject,
config=self.config,
**self.extra_options,
)
self.add_task(self._consume_pull(cb=self.consume))
Expand Down Expand Up @@ -778,6 +779,7 @@ async def _create_subscription( # type: ignore[override]

self.subscription = await connection.pull_subscribe(
subject=self.clear_subject,
config=self.config,
**self.extra_options,
)
self.add_task(self._consume_pull(cb=self._put_msg))
Expand Down Expand Up @@ -841,6 +843,7 @@ async def _create_subscription( # type: ignore[override]
"""Create NATS subscription and start consume task."""
self.subscription = await connection.pull_subscribe(
subject=self.clear_subject,
config=self.config,
**self.extra_options,
)
self.add_task(self._consume_pull())
Expand Down
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ otel = ["opentelemetry-sdk>=1.24.0,<2.0.0"]
optionals = ["faststream[rabbit,kafka,confluent,nats,redis,otel]"]

devdocs = [
"mkdocs-material==9.5.27",
"mkdocs-material==9.5.29",
"mkdocs-static-i18n==1.2.3",
"mdx-include==1.4.2",
"mkdocstrings[python]==0.25.1",
Expand Down Expand Up @@ -111,14 +111,14 @@ types = [

lint = [
"faststream[types]",
"ruff==0.5.1",
"ruff==0.5.2",
"bandit==1.7.9",
"semgrep==1.79.0",
"codespell==2.3.0",
]

test-core = [
"coverage[toml]==7.5.4",
"coverage[toml]==7.6.0",
"pytest==8.2.2",
"pytest-asyncio==0.23.7",
"dirty-equals==0.7.1.post0",
Expand All @@ -127,7 +127,7 @@ test-core = [

testing = [
"faststream[test-core]",
"fastapi==0.111.0",
"fastapi==0.111.1",
"pydantic-settings>=2.0.0,<3.0.0",
"httpx==0.27.0",
"PyYAML==6.0.1",
Expand Down
70 changes: 70 additions & 0 deletions tests/asyncapi/base/naming.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,76 @@ async def handle_user_created(msg: str): ...
"custom:Message:Payload"
]

def test_subscriber_naming_default(self):
broker = self.broker_class()

broker.subscriber("test")

schema = get_app_schema(FastStream(broker)).to_jsonable()

assert list(schema["channels"].keys()) == [
IsStr(regex=r"test[\w:]*:Subscriber")
]

assert list(schema["components"]["messages"].keys()) == [
IsStr(regex=r"test[\w:]*:Subscriber:Message")
]

for key, v in schema["components"]["schemas"].items():
assert key == "Subscriber:Message:Payload"
assert v == {"title": key}

def test_subscriber_naming_default_with_title(self):
broker = self.broker_class()

broker.subscriber("test", title="custom")

schema = get_app_schema(FastStream(broker)).to_jsonable()

assert list(schema["channels"].keys()) == ["custom"]

assert list(schema["components"]["messages"].keys()) == ["custom:Message"]

assert list(schema["components"]["schemas"].keys()) == [
"custom:Message:Payload"
]

assert schema["components"]["schemas"]["custom:Message:Payload"] == {
"title": "custom:Message:Payload"
}

def test_multi_subscribers_naming_default(self):
broker = self.broker_class()

@broker.subscriber("test")
async def handle_user_created(msg: str): ...

broker.subscriber("test2")
broker.subscriber("test3")

schema = get_app_schema(FastStream(broker)).to_jsonable()

assert list(schema["channels"].keys()) == [
IsStr(regex=r"test[\w:]*:HandleUserCreated"),
IsStr(regex=r"test2[\w:]*:Subscriber"),
IsStr(regex=r"test3[\w:]*:Subscriber"),
]

assert list(schema["components"]["messages"].keys()) == [
IsStr(regex=r"test[\w:]*:HandleUserCreated:Message"),
IsStr(regex=r"test2[\w:]*:Subscriber:Message"),
IsStr(regex=r"test3[\w:]*:Subscriber:Message"),
]

assert list(schema["components"]["schemas"].keys()) == [
"HandleUserCreated:Message:Payload",
"Subscriber:Message:Payload",
]

assert schema["components"]["schemas"]["Subscriber:Message:Payload"] == {
"title": "Subscriber:Message:Payload"
}


class FilterNaming(BaseNaming):
def test_subscriber_filter_base(self):
Expand Down

0 comments on commit 45a1420

Please sign in to comment.