Skip to content

Commit

Permalink
Backwards compatibility for windowing API
Browse files Browse the repository at this point in the history
  • Loading branch information
brentyi committed Aug 2, 2023
1 parent e02410f commit f201a56
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
1 change: 1 addition & 0 deletions viser/_viser.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def __init__(self, host: str = "0.0.0.0", port: int = 8080):
port=port,
message_class=_messages.Message,
http_server_root=Path(__file__).absolute().parent / "client" / "build",
client_api_version=1,
)
self._server = server
super().__init__(server)
Expand Down
37 changes: 27 additions & 10 deletions viser/infra/_infra.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Callable,
Dict,
List,
Literal,
NewType,
Optional,
Sequence,
Expand Down Expand Up @@ -109,6 +110,8 @@ class Server(MessageHandler):
required in the future.
http_server_root: Path to root for HTTP server.
verbose: Toggle for print messages.
client_api_version: Flag for backwards compatibility. 0 sends individual
messages. 1 sends windowed messages.
"""

def __init__(
Expand All @@ -118,6 +121,7 @@ def __init__(
message_class: Type[Message] = Message,
http_server_root: Optional[Path] = None,
verbose: bool = True,
client_api_version: Literal[0, 1] = 0,
):
super().__init__()

Expand All @@ -130,6 +134,7 @@ def __init__(
self._message_class = message_class
self._http_server_root = http_server_root
self._verbose = verbose
self._client_api_version = client_api_version

self._thread_executor = ThreadPoolExecutor(max_workers=32)

Expand Down Expand Up @@ -230,10 +235,12 @@ def handle_incoming(message: Message) -> None:
websocket,
client_id,
client_state.message_buffer.get,
self._client_api_version,
),
_broadcast_producer(
websocket,
self._broadcast_buffer.window_generator(client_id).__anext__,
self._client_api_version,
),
_consumer(websocket, handle_incoming, message_class),
)
Expand Down Expand Up @@ -325,6 +332,7 @@ async def _client_producer(
websocket: WebSocketServerProtocol,
client_id: ClientId,
get_next: Callable[[], Awaitable[Message]],
client_api_version: int,
) -> None:
"""Infinite loop to send messages from a buffer to a single client."""

Expand All @@ -335,25 +343,34 @@ async def _client_producer(
message_future = asyncio.ensure_future(get_next())
outgoing = window.get_window_to_send()
if outgoing is not None:
serialized = msgpack.packb(
tuple(message.as_serializable_dict() for message in outgoing)
)
assert isinstance(serialized, bytes)
await websocket.send(serialized)
if client_api_version:
serialized = msgpack.packb(
tuple(message.as_serializable_dict() for message in outgoing)
)
assert isinstance(serialized, bytes)
await websocket.send(serialized)
else:
for msg in outgoing:
await websocket.send(msg.as_serializable_dict())


async def _broadcast_producer(
websocket: WebSocketServerProtocol,
get_next_window: Callable[[], Awaitable[Sequence[Message]]],
client_api_version: int,
) -> None:
"""Infinite loop to broadcast windows of messages from a buffer."""
while True:
outgoing = await get_next_window()
serialized = msgpack.packb(
tuple(message.as_serializable_dict() for message in outgoing)
)
assert isinstance(serialized, bytes)
await websocket.send(serialized)
if client_api_version:
serialized = msgpack.packb(
tuple(message.as_serializable_dict() for message in outgoing)
)
assert isinstance(serialized, bytes)
await websocket.send(serialized)
else:
for msg in outgoing:
await websocket.send(msg.as_serializable_dict())


async def _consumer(
Expand Down

0 comments on commit f201a56

Please sign in to comment.