diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 4bd01061..bb373b8a 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,7 +3,8 @@ Changelog v0.4.15 ======= -- Websockets (https://github.com/python-websockets/websockets) server support +- Websockets server support (https://github.com/python-websockets/websockets) +- AsyncWebsockets client support (https://github.com/Fuyukai/asyncwebsockets) v0.4.14 ======= diff --git a/README.md b/README.md index 37cf7cbb..047b4f27 100644 --- a/README.md +++ b/README.md @@ -12,18 +12,19 @@ pip install rsocket You may also install using some **extras**: -| Extra | Functionality | Documentation | -|-------------|--------------------------------------------------------------------------------------------|---------------------------------------------------------------------| -| rx | ReactiveX ([v3](https://pypi.org/project/Rx/)) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/reactivex) | -| reactivex | [ReactiveX](https://reactivex.io/) ([v4](https://pypi.org/project/reactivex/)) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/reactivex) | -| aiohttp | [aiohttp](https://docs.aiohttp.org/en/stable/) Websocket transport (server/client) | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/websocket) | -| quart | [Quart](https://pgjones.gitlab.io/quart/) Websocket transport (server only) | | -| quic | [QUIC](https://github.com/aiortc/aioquic) transport | | -| websockets | [Websockets](https://github.com/python-websockets/websockets) transport (server only) | | -| cli | Command line | [Tutorial](https://rsocket.io/guides/rsocket-py/cli) | -| optimized | Frame parse/serialize optimizations | | -| cloudevents | [CloudEvents](https://cloudevents.io/) integration | | -| graphql | [GraphQL](https://graphql.org/) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/graphql) | +| Extra | Functionality | Documentation | +|-----------------|--------------------------------------------------------------------------------------------|---------------------------------------------------------------------| +| rx | ReactiveX ([v3](https://pypi.org/project/Rx/)) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/reactivex) | +| reactivex | [ReactiveX](https://reactivex.io/) ([v4](https://pypi.org/project/reactivex/)) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/reactivex) | +| aiohttp | [aiohttp](https://docs.aiohttp.org/en/stable/) Websocket transport (server/client) | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/websocket) | +| quart | [Quart](https://pgjones.gitlab.io/quart/) Websocket transport (server only) | | +| quic | [QUIC](https://github.com/aiortc/aioquic) transport | | +| websockets | [Websockets](https://github.com/python-websockets/websockets) transport (server only) | | +| asyncwebsockets | [Websockets](https://github.com/Fuyukai/asyncwebsockets) transport (client only) | | +| cli | Command line | [Tutorial](https://rsocket.io/guides/rsocket-py/cli) | +| optimized | Frame parse/serialize optimizations | | +| cloudevents | [CloudEvents](https://cloudevents.io/) integration | | +| graphql | [GraphQL](https://graphql.org/) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/graphql) | For example: diff --git a/requirements.txt b/requirements.txt index b99818ef..5b928d0c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,4 +23,5 @@ pydantic==1.10.13 Werkzeug==3.0.0 graphql-core==3.2.3 gql==3.4.1 -websockets==11.0.3 \ No newline at end of file +websockets==11.0.3 +asyncwebsockets==0.9.4 \ No newline at end of file diff --git a/rsocket/transports/asyncwebsockets_transport.py b/rsocket/transports/asyncwebsockets_transport.py new file mode 100644 index 00000000..aaa3a5b3 --- /dev/null +++ b/rsocket/transports/asyncwebsockets_transport.py @@ -0,0 +1,56 @@ +import asyncio +from contextlib import asynccontextmanager + +from rsocket.exceptions import RSocketTransportError +from rsocket.frame import Frame +from rsocket.helpers import wrap_transport_exception, single_transport_provider +from rsocket.logger import logger +from rsocket.rsocket_client import RSocketClient +from rsocket.transports.abstract_messaging import AbstractMessagingTransport + + +@asynccontextmanager +async def websocket_client(url: str, + **kwargs) -> RSocketClient: + """ + Helper method to instantiate an RSocket client using a websocket url over asyncwebsockets client. + """ + from asyncwebsockets import open_websocket + async with open_websocket(url) as websocket: + async with RSocketClient(single_transport_provider(TransportAsyncWebsocketsClient(websocket)), + **kwargs) as client: + yield client + + +class TransportAsyncWebsocketsClient(AbstractMessagingTransport): + """ + RSocket transport over client side asyncwebsockets. + """ + + def __init__(self, websocket): + super().__init__() + self._ws = websocket + self._message_handler = None + + async def connect(self): + self._message_handler = asyncio.create_task(self.handle_incoming_ws_messages()) + + async def handle_incoming_ws_messages(self): + from wsproto.events import BytesMessage + try: + async for message in self._ws: + if isinstance(message, BytesMessage): + async for frame in self._frame_parser.receive_data(message.data, 0): + self._incoming_frame_queue.put_nowait(frame) + except asyncio.CancelledError: + logger().debug('Asyncio task canceled: incoming_data_listener') + except Exception: + self._incoming_frame_queue.put_nowait(RSocketTransportError()) + + async def send_frame(self, frame: Frame): + with wrap_transport_exception(): + await self._ws.send(frame.serialize()) + + async def close(self): + self._message_handler.cancel() + await self._message_handler diff --git a/setup.cfg b/setup.cfg index 355303b2..f966e911 100644 --- a/setup.cfg +++ b/setup.cfg @@ -61,8 +61,8 @@ cloudevents = graphql = graphql-core>=3.2.0 gql>=3.4.0 -websockets = - websockets>=11.0.0 +websockets = websockets>=11.0.0 +asyncwebsockets = asyncwebsockets>=0.9.4 [options.entry_points] cli.console_scripts = diff --git a/tests/conftest.py b/tests/conftest.py index c2cf631e..68866800 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -40,13 +40,13 @@ def setup_logging(level=logging.DEBUG, use_file: bool = False): setup_logging(logging.WARN) tested_transports = [ - 'tcp' + 'tcp', + 'quart' ] if sys.version_info[:3] < (3, 11, 5): tested_transports += [ 'aiohttp', - 'quart', 'quic', 'http3', # 'websockets' diff --git a/tests/tools/fixtures_quart.py b/tests/tools/fixtures_quart.py index 459da7bc..47ee44d7 100644 --- a/tests/tools/fixtures_quart.py +++ b/tests/tools/fixtures_quart.py @@ -11,7 +11,7 @@ async def pipe_factory_quart_websocket(unused_tcp_port, client_arguments=None, server_arguments=None): from quart import Quart from rsocket.transports.quart_websocket import websocket_handler - from rsocket.transports.aiohttp_websocket import websocket_client + from rsocket.transports.asyncwebsockets_transport import websocket_client app = Quart(__name__) server: Optional[RSocketBase] = None @@ -32,7 +32,7 @@ async def ws(): server_task = asyncio.create_task(app.run_task(port=unused_tcp_port)) await asyncio.sleep(0) - async with websocket_client('http://localhost:{}'.format(unused_tcp_port), + async with websocket_client('ws://localhost:{}'.format(unused_tcp_port), **client_arguments) as client: await wait_for_server.wait() yield server, client