From a39a64f999b51900bc5cf42f13c74090fb691ae9 Mon Sep 17 00:00:00 2001 From: Brent Yi Date: Tue, 8 Aug 2023 23:57:16 -0700 Subject: [PATCH] Print thread pool errors --- viser/_message_api.py | 6 +++++- viser/infra/_infra.py | 26 +++++++++++++++++++++++--- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/viser/_message_api.py b/viser/_message_api.py index 1bce7c9ed..63b52ba40 100644 --- a/viser/_message_api.py +++ b/viser/_message_api.py @@ -456,7 +456,11 @@ def _queue(self, message: _messages.Message) -> None: """Wrapped method for sending messages safely.""" # This implementation will retain message ordering because _queue_thread has # just 1 worker. - self._queue_thread.submit(lambda: self._queue_blocking(message)) + from .infra._infra import error_print_wrapper + + self._queue_thread.submit( + error_print_wrapper(lambda: self._queue_blocking(message)) + ) def _queue_blocking(self, message: _messages.Message) -> None: """Wrapped method for sending messages safely. Blocks until ready to send.""" diff --git a/viser/infra/_infra.py b/viser/infra/_infra.py index 46c0369ff..1fa88920f 100644 --- a/viser/infra/_infra.py +++ b/viser/infra/_infra.py @@ -215,11 +215,15 @@ async def serve(websocket: WebSocketServerProtocol) -> None: def handle_incoming(message: Message) -> None: self._thread_executor.submit( - lambda: self._handle_incoming_message(client_id, message) + error_print_wrapper( + lambda: self._handle_incoming_message(client_id, message) + ) ) self._thread_executor.submit( - lambda: client_connection._handle_incoming_message( - client_id, message + error_print_wrapper( + lambda: client_connection._handle_incoming_message( + client_id, message + ) ) ) @@ -392,3 +396,19 @@ async def _consumer( assert isinstance(raw, bytes) message = message_class.deserialize(raw) handle_message(message) + + +def error_print_wrapper(inner: Callable[[], Any]) -> Callable[[], None]: + """Wrap a Callable to print error messages when they happen. + + This can be helpful for jobs submitted to ThreadPoolExecutor instances, which, by + default, will suppress error messages until returned futures are awaited. + """ + + def wrapped() -> None: + try: + inner() + except Exception as e: + print(e) + + return wrapped