Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport Client Multiplexer #417

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 192 additions & 1 deletion python/lib/communication/dmod/communication/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from asyncio import AbstractEventLoop
from deprecated import deprecated
from pathlib import Path
from typing import Generic, Optional, Type, TypeVar, Union
from typing import Dict, Generic, List, Optional, Type, TypeVar, Union

import websockets

Expand Down Expand Up @@ -504,6 +504,197 @@ def is_new_session(self) -> Optional[bool]:
return self._is_new_session


class TransportClientMultiplexer:
"""
Wrapper around a ::class:`TransportLayerClient` that allows for multiplexing over that client's connection.

Type supports spawning attached ::class:`MuxStreamClient` instances which then utilize this type's multiplexing
capabilities.
"""

def __init__(self, wrapped_client: TransportLayerClient, len_mux_ids: int = 8, *args, **kwargs):
super().__init__(*args, **kwargs)
self._wrapped_client: TransportLayerClient = wrapped_client
self._recv_queues: Dict[str, List[str]] = dict()
""" Per-mux-stream queues ::method:`mux_recv` to hold data via ::method:`async_recv` for other streams. """
self._active_mux_clients: Dict[str, MuxStreamClient] = dict()
""" Clients spawned by ::method:`spawn_mux_client` and not yet retired with ::method:`retire_mux_client`. """
self._perform_recv_lock: asyncio.Lock = asyncio.Lock()
""" A lock to avoid getting async things out of order due to poorly time queuing in ::method:`mux_recv`. """
self._int_mux_id = -1
""" Int basis for str mux ids, initially ``-1`` so later logic increments and uses ``0`` as 1st base value. """

self._len_mux_id = len_mux_ids

async def mux_recv(self, mux_id: str) -> str:
"""
Receive and return data for a particular mux stream from server over a multiplexed connection.

Parameters
----------
mux_id : str
A unique identifier for the specific isolated data stream to receive from.

Returns
-------
str
The data received from the server, as a string.
"""
while True:
# Lock the connection to avoid trouble if two consecutive messages are received AFTER we check the queue,
# but a different function call beats "this" one's async await to the first message (and then "this" one
# end up with the second message out of order)
async with self._perform_recv_lock:
# First check if we already have something
if self._recv_queues.get(mux_id, None):
return self._recv_queues[mux_id].pop(0)

# Then try receiving
mux_encoded_data = await self._wrapped_client.async_recv()
received_mux_id, data = mux_encoded_data[:self._len_mux_id], mux_encoded_data[self._len_mux_id:]

if mux_id == received_mux_id:
return data
elif received_mux_id in self._recv_queues:
self._recv_queues[received_mux_id].append(data)
else:
# FIXME: is this the best thing to do? Should we instead just throw this out? Or save elsewhere?
msg = f"{self.__class__.__name__} received data on unexpected mux stream {received_mux_id}"
raise RuntimeError(msg)

async def mux_send(self, data: Union[str, bytearray, bytes], mux_id: str):
"""
Send data to server on a particular mux stream.

Parameters
----------
data: Union[str, bytearray, bytes]
The data to send.
mux_id : str
A unique identifier for an isolated data stream.
"""
if len(mux_id) != self._len_mux_id:
msg = f"{self.__class__.__name__} can't send using mux id {mux_id} (expected length {self._len_mux_id!s})"
raise ValueError(msg)
await self._wrapped_client.async_send(f"{mux_id}{data}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we will need to make changes to the request service's deserialization and serialization behavior to accommodate this change, right? One fear that I have is that this custom serialization format will spread across the entire codebase. For example, if you wanted to use this client and communicate with a particular service directly, that service would have to implement the custom deserialization format and handle stream ordering too.


def retire_mux_client(self, mux_client: 'MuxStreamClient') -> bool:
"""
Retire the mux_id and associated, spawned client reference, making the id eligible for reuse.

By "retire," this instance simple removes the entry with the provided mux id key from its internal dictionary
of previously-spawned clients (and queue received data), assuming this particular client was present.

The function will not "retire" the client and will return ``False`` if there is received data queued for this
particular client that was received during a ::method:`mux_recv` for a different client. Also, a
::class:`ValueError` will be raised if this instance does not recognize the mux client as active.

Parameters
----------
mux_client : MuxStreamClient
The client to "retire."

Returns
-------
bool
Whether this particular client has now been "retired."
"""
lookup_client = self._active_mux_clients.get(mux_client.mux_id, None)

if lookup_client is None:
raise ValueError(f"{self.__class__.__name__} can't retire unrecognized {mux_client.__class__.__name__} "
f"with id {mux_client.mux_id}")
if lookup_client is not mux_client:
raise ValueError(f"{self.__class__.__name__} can't retire {mux_client.__class__.__name__} that is different"
f" than internal reference for client with id {mux_client.mux_id}")
# Don't retire if there is still data queued for the client
if len(self._recv_queues[mux_client.mux_id]) > 0:
return False
else:
self._active_mux_clients.pop(mux_client.mux_id)
self._recv_queues.pop(mux_client.mux_id)
return True

def spawn_mux_client(self) -> 'MuxStreamClient':
"""
Create and return a ::class:`MuxStreamClient` backed by this instances.

Returns
-------
MuxStreamClient
A transport client object backed by a particular mux stream of this instance's shared connection.
"""
# Loop through possible mux_id values until we find a free one, then use that to build and return a client
while True:
# Increment first, unless we overflow
self._int_mux_id = (self._int_mux_id + 1) if (self._int_mux_id + 1) < (10 ** self._len_mux_id) else 0

# Use that new mux_int for the id (padded with 0s)
mux_id = str(self._int_mux_id).zfill(self._len_mux_id)

# Of course, only exit the loop and return if that mux_id isn't in use
if mux_id not in self._active_mux_clients:
# Do these first here to make sure the keys are in the dict, or else an exception should raise when
# initializing the client
self._active_mux_clients[mux_id] = None
self._recv_queues[mux_id] = list()

client = MuxStreamClient(mux_id=mux_id, parent=self)
self._active_mux_clients[mux_id] = client
return client


class MuxStreamClient(TransportLayerClient):
"""
Subtype of ::class:`TransportLayerClient` for working over a mux stream from a ::class:`TransportClientMultiplexer`.

Class is tightly coupled to ::class:`TransportClientMultiplexer`. Instances should only be created using
::method:`TransportClientMultiplexer.spawn_mux_client`.
"""

def __init__(self, mux_id: str, parent: TransportClientMultiplexer, *args, **kwargs):
# To ensure clients are only created by "spawning" from a parent, do a few ... interesting checks
# These all are consistent with what should happen within TransportClientMultiplexer.spawn_mux_client() before a
# new stream client object is created
if (mux_id not in parent._active_mux_clients
or parent._active_mux_clients[mux_id] is not None # yes, should be None; changed right after this __init__
or not isinstance(parent._recv_queues.get(mux_id), list)
or len(parent._recv_queues[mux_id]) != 0):
raise RuntimeError(f"Do not create {self.__class__.__name__} except by using `spawn_mux_client`")

super().__init__(*args, **kwargs)
self._mux_id = mux_id
self._parent = parent
self._is_retired = False

async def async_send(self, data: Union[str, bytearray, bytes], await_response: bool = False) -> Optional[str]:
await self._parent.mux_send(data=data, mux_id=self._mux_id)
if await_response:
return await self._parent.mux_recv(mux_id=self._mux_id)

async def async_recv(self) -> str:
return await self._parent.mux_recv(mux_id=self._mux_id)

@property
def endpoint_uri(self) -> str:
return self._parent._wrapped_client.endpoint_uri

@property
def mux_id(self) -> str:
return self._mux_id

def retire(self) -> bool:
"""
Have this instances parent retire it from use.

Returns
-------
bool
Whether retiring was successful.
"""
return self._parent.retire_mux_client(mux_client=self)


class RequestClient:
"""
Simple DMOD service client, dealing with DMOD request message and response objects.
Expand Down
Loading