Skip to content

Commit

Permalink
chore: use aiter_lines directly for application/nd-json streams
Browse files Browse the repository at this point in the history
  • Loading branch information
simon-schoonjans committed Apr 12, 2024
1 parent 2d5c349 commit e3ff8a0
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions src/waylay/sdk/api/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from inspect import isclass
from typing import (
Any,
AsyncGenerator,
AsyncIterator,
Mapping,
Optional,
Expand Down Expand Up @@ -385,21 +386,25 @@ async def _iter_event_stream(
) -> AsyncIterator[T]:
_response_type = _response_type_for_status_code(response.status_code, response_type)
content_type = response.headers.get("content-type", "")
try:
async for event_str in __iter_events_response(response, content_type):
event = __parse_event(event_str, content_type)
if not event:
continue
if _ignore_retry_events and len(event) == 1 and "retry" in event:
continue
_deserialized_event = _deserialize(
_extract_selected(event, select_path), _response_type
)
yield _deserialized_event

async def __iter_events_response(response: Response, content_type: str) -> AsyncGenerator[str]:
if content_type.startswith(TEXT_EVENT_STREAM_CONTENT_TYPE):
async for event_str_batch in response.aiter_text():
for event_str in event_str_batch.split("\r\n\r"):
event = __parse_event(event_str, content_type)
if not event:
continue
if _ignore_retry_events and len(event) == 1 and "retry" in event:
continue
_deserialized_event = _deserialize(
_extract_selected(event, select_path), _response_type
)
yield _deserialized_event
finally:
await response.aclose()

yield event_str
else:
async for event_str in response.aiter_lines():
yield event_str

def __parse_event(event_str: str, content_type: str):
if content_type.startswith(TEXT_EVENT_STREAM_CONTENT_TYPE):
Expand Down

0 comments on commit e3ff8a0

Please sign in to comment.