diff --git a/pyproject.toml b/pyproject.toml index 7dcd8406..4dff6d17 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ classifiers = [ dependencies = [ "websockets>=10.4", "numpy>=1.0.0", - "msgpack>=1.0.7", + "msgspec>=0.18.6", "imageio>=2.0.0", "pyliblzfse>=0.4.1; platform_system!='Windows'", "scikit-image>=0.18.0", diff --git a/src/viser/client/package.json b/src/viser/client/package.json index 2630f239..5bc4dd93 100644 --- a/src/viser/client/package.json +++ b/src/viser/client/package.json @@ -10,6 +10,7 @@ "@mantine/vanilla-extract": "^7.6.2", "@mdx-js/mdx": "^3.0.1", "@mdx-js/react": "^3.0.1", + "@msgpack/msgpack": "^3.0.0-beta2", "@react-three/drei": "^9.64.0", "@react-three/fiber": "^8.12.0", "@tabler/icons-react": "^3.1.0", @@ -23,11 +24,11 @@ "clsx": "^2.1.0", "colortranslator": "^4.1.0", "dayjs": "^1.11.10", + "fflate": "^0.8.2", "hold-event": "^1.1.0", "immer": "^10.0.4", "its-fine": "^1.2.5", "mantine-react-table": "^2.0.0-beta.0", - "msgpackr": "^1.10.2", "postcss": "^8.4.38", "prettier": "^3.0.3", "react": "^18.2.0", diff --git a/src/viser/client/src/App.tsx b/src/viser/client/src/App.tsx index a2dfd1c6..66e0e1bd 100644 --- a/src/viser/client/src/App.tsx +++ b/src/viser/client/src/App.tsx @@ -177,25 +177,26 @@ function ViewerRoot() { return ( - {viewer.messageSource === "websocket" ? ( - - ) : null} - {viewer.messageSource === "file_playback" ? ( - - ) : null} - + + {viewer.messageSource === "websocket" ? ( + + ) : null} + {viewer.messageSource === "file_playback" ? ( + + ) : null} + ); } -function ViewerContents() { +function ViewerContents({ children }: { children: React.ReactNode }) { const viewer = React.useContext(ViewerContext)!; - const dark_mode = viewer.useGui((state) => state.theme.dark_mode); + const darkMode = viewer.useGui((state) => state.theme.dark_mode); const colors = viewer.useGui((state) => state.theme.colors); - const control_layout = viewer.useGui((state) => state.theme.control_layout); + const controlLayout = viewer.useGui((state) => state.theme.control_layout); return ( <> - + + { children } ({ - backgroundColor: dark_mode ? theme.colors.dark[9] : "#fff", + backgroundColor: darkMode ? theme.colors.dark[9] : "#fff", flexGrow: 1, overflow: "hidden", height: "100%", @@ -258,7 +260,7 @@ function ViewerContents() { ) : null} {viewer.messageSource == "websocket" ? ( - + ) : null} diff --git a/src/viser/client/src/FilePlayback.tsx b/src/viser/client/src/FilePlayback.tsx index 3595bc33..538e10ba 100644 --- a/src/viser/client/src/FilePlayback.tsx +++ b/src/viser/client/src/FilePlayback.tsx @@ -1,7 +1,10 @@ -import { unpack } from "msgpackr"; +import { decodeAsync, decode } from "@msgpack/msgpack"; import { Message } from "./WebsocketMessages"; -import React, { useContext, useEffect } from "react"; +import { decompress } from "fflate"; + +import { useContext, useEffect, useState } from "react"; import { ViewerContext } from "./App"; +import { Progress, useMantineTheme } from "@mantine/core"; interface SerializedMessages { loopStartIndex: number | null; @@ -9,36 +12,65 @@ interface SerializedMessages { messages: [number, Message][]; } -async function deserializeMsgpackFile(fileUrl: string): Promise { - // Fetch the file using fetch() +/** Download, decompress, and deserialize a file, which should be serialized + * via msgpack and compressed via gzip. Also takes a hook for status updates. */ +async function deserializeGzippedMsgpackFile( + fileUrl: string, + setStatus: (status: { downloaded: number; total: number }) => void, +): Promise { const response = await fetch(fileUrl); if (!response.ok) { throw new Error(`Failed to fetch the file: ${response.statusText}`); } return new Promise((resolve) => { - let length = 0; - const buffer: Uint8Array[] = []; - response.body!.pipeThrough(new DecompressionStream("gzip")).pipeTo( - new WritableStream({ - write(chunk) { - buffer.push(chunk); - length += chunk.length; - }, - close() { - const output = new Uint8Array(length); - let offset = 0; - for (const chunk of buffer) { - output.set(chunk, offset); - offset += chunk.length; - } - console.log(output.length); - resolve(unpack(output)); - }, - abort(err) { - console.error("Stream aborted:", err); - }, - }), - ); + const gzipTotalLength = parseInt(response.headers.get("Content-Length")!); + + if (!DecompressionStream) { + // Implementation without streaming. + console.log( + "DecompressionStream is unavailable. Falling back to approach without streams.", + ); + setStatus({ downloaded: gzipTotalLength * 0.0, total: gzipTotalLength }); + response.arrayBuffer().then((buffer) => { + // Down downloading. + setStatus({ + downloaded: gzipTotalLength * 0.8, + total: gzipTotalLength, + }); + decompress(new Uint8Array(buffer), (error, result) => { + // Done decompressing, time to unpack. + setStatus({ + downloaded: gzipTotalLength * 0.9, + total: gzipTotalLength, + }); + resolve(decode(result) as T); + setStatus({ + downloaded: gzipTotalLength, + total: gzipTotalLength, + }); + }); + }); + } else { + // Counters for processed bytes, both before and after compression. + let gzipReceived = 0; + + // Stream: fetch -> gzip -> msgpack. + decodeAsync( + response + .body!.pipeThrough( + // Count number of (compressed) bytes. + new TransformStream({ + transform(chunk, controller) { + gzipReceived += chunk.length; + setStatus({ downloaded: gzipReceived, total: gzipTotalLength }); + controller.enqueue(chunk); + // return new Promise((resolve) => setTimeout(resolve, 100)); + }, + }), + ) + .pipeThrough(new DecompressionStream("gzip")), + ).then((val) => resolve(val as T)); + } }); } @@ -46,36 +78,64 @@ export function PlaybackFromFile({ fileUrl }: { fileUrl: string }) { const viewer = useContext(ViewerContext)!; const messageQueueRef = viewer.messageQueueRef; + const darkMode = viewer.useGui((state) => state.theme.dark_mode); + const [status, setStatus] = useState({ downloaded: 0.0, total: 0.0 }); + const [loaded, setLoaded] = useState(false); + const theme = useMantineTheme(); + useEffect(() => { - deserializeMsgpackFile(fileUrl).then((recording) => { - let messageIndex = 0; + deserializeGzippedMsgpackFile(fileUrl, setStatus).then( + (recording) => { + let messageIndex = 0; - function continuePlayback() { - const currentTimeSeconds = recording.messages[messageIndex][0]; - while (currentTimeSeconds >= recording.messages[messageIndex][0]) { - messageQueueRef.current.push(recording.messages[messageIndex][1]); - messageIndex += 1; + function continuePlayback() { + setLoaded(true); + const currentTimeSeconds = recording.messages[messageIndex][0]; + while (currentTimeSeconds >= recording.messages[messageIndex][0]) { + messageQueueRef.current.push(recording.messages[messageIndex][1]); + messageIndex += 1; - // Either finish playback or loop. - if (messageIndex === recording.messages.length) { - if (recording.loopStartIndex === null) return; - messageIndex = recording.loopStartIndex; - setTimeout( - continuePlayback, - (recording.durationSeconds - currentTimeSeconds) * 1000.0, - ); - return; + // Either finish playback or loop. + if (messageIndex === recording.messages.length) { + if (recording.loopStartIndex === null) return; + messageIndex = recording.loopStartIndex; + setTimeout( + continuePlayback, + (recording.durationSeconds - currentTimeSeconds) * 1000.0, + ); + return; + } } + + // Handle next set of frames. + setTimeout( + continuePlayback, + (recording.messages[messageIndex][0] - currentTimeSeconds) * 1000.0, + ); } + setTimeout(continuePlayback, recording.messages[0][0] * 1000.0); + }, + ); + }, []); - // Handle next set of frames. - setTimeout( - continuePlayback, - (recording.messages[messageIndex][0] - currentTimeSeconds) * 1000.0, - ); - } - setTimeout(continuePlayback, recording.messages[0][0] * 1000.0); - }); - }); - return <>; + return ( +
+ +
+ ); } diff --git a/src/viser/client/src/WebsocketFunctions.tsx b/src/viser/client/src/WebsocketFunctions.tsx index d02b4b87..f699c120 100644 --- a/src/viser/client/src/WebsocketFunctions.tsx +++ b/src/viser/client/src/WebsocketFunctions.tsx @@ -1,7 +1,7 @@ import { MutableRefObject } from "react"; import * as THREE from "three"; import { Message } from "./WebsocketMessages"; -import { pack } from "msgpackr"; +import { encode } from "@msgpack/msgpack"; /** Send message over websocket. */ export function sendWebsocketMessage( @@ -9,7 +9,7 @@ export function sendWebsocketMessage( message: Message, ) { if (websocketRef.current === null) return; - websocketRef.current.send(pack(message)); + websocketRef.current.send(encode(message)); } /** Returns a function for sending messages, with automatic throttling. */ @@ -25,7 +25,7 @@ export function makeThrottledMessageSender( if (websocketRef.current === null) return; latestMessage = message; if (readyToSend) { - websocketRef.current.send(pack(message)); + websocketRef.current.send(encode(message)); stale = false; readyToSend = false; diff --git a/src/viser/client/src/WebsocketInterface.tsx b/src/viser/client/src/WebsocketInterface.tsx index e928537c..fd4c23bc 100644 --- a/src/viser/client/src/WebsocketInterface.tsx +++ b/src/viser/client/src/WebsocketInterface.tsx @@ -1,5 +1,5 @@ import AwaitLock from "await-lock"; -import { unpack } from "msgpackr"; +import { decode } from "@msgpack/msgpack"; import React, { useContext } from "react"; @@ -55,7 +55,7 @@ export function WebsocketMessageProducer() { // Reduce websocket backpressure. const messagePromise = new Promise((resolve) => { (event.data.arrayBuffer() as Promise).then((buffer) => { - resolve(unpack(new Uint8Array(buffer)) as Message[]); + resolve(decode(new Uint8Array(buffer)) as Message[]); }); }); diff --git a/src/viser/client/src/components/UploadButton.tsx b/src/viser/client/src/components/UploadButton.tsx index 2edac898..0e1915a4 100644 --- a/src/viser/client/src/components/UploadButton.tsx +++ b/src/viser/client/src/components/UploadButton.tsx @@ -5,7 +5,7 @@ import { Box, Progress } from "@mantine/core"; import { Button } from "@mantine/core"; import React, { useContext } from "react"; import { ViewerContext, ViewerContextContents } from "../App"; -import { pack } from "msgpackr"; +import { encode } from "@msgpack/msgpack"; import { IconCheck } from "@tabler/icons-react"; import { notifications } from "@mantine/notifications"; import { htmlIconWrapper } from "./ComponentStyles.css"; @@ -137,8 +137,8 @@ function useFileUpload({ const transferUuid = uuid(); const notificationId = "upload-" + transferUuid; - const send = (message: Parameters[0]) => - websocketRef.current?.send(pack(message)); + const send = (message: Parameters[0]) => + websocketRef.current?.send(encode(message)); // Begin upload by setting initial state updateUploadState({ diff --git a/src/viser/client/yarn.lock b/src/viser/client/yarn.lock index 83a8bc0e..74a2f5ed 100644 --- a/src/viser/client/yarn.lock +++ b/src/viser/client/yarn.lock @@ -672,35 +672,10 @@ dependencies: promise-worker-transferable "^1.0.4" -"@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.2": - version "3.0.2" - resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-3.0.2.tgz#44d752c1a2dc113f15f781b7cc4f53a307e3fa38" - integrity sha512-9bfjwDxIDWmmOKusUcqdS4Rw+SETlp9Dy39Xui9BEGEk19dDwH0jhipwFzEff/pFg95NKymc6TOTbRKcWeRqyQ== - -"@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.2": - version "3.0.2" - resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-3.0.2.tgz#f954f34355712212a8e06c465bc06c40852c6bb3" - integrity sha512-lwriRAHm1Yg4iDf23Oxm9n/t5Zpw1lVnxYU3HnJPTi2lJRkKTrps1KVgvL6m7WvmhYVt/FIsssWay+k45QHeuw== - -"@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.2": - version "3.0.2" - resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-3.0.2.tgz#45c63037f045c2b15c44f80f0393fa24f9655367" - integrity sha512-FU20Bo66/f7He9Fp9sP2zaJ1Q8L9uLPZQDub/WlUip78JlPeMbVL8546HbZfcW9LNciEXc8d+tThSJjSC+tmsg== - -"@msgpackr-extract/msgpackr-extract-linux-arm@3.0.2": - version "3.0.2" - resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-3.0.2.tgz#35707efeafe6d22b3f373caf9e8775e8920d1399" - integrity sha512-MOI9Dlfrpi2Cuc7i5dXdxPbFIgbDBGgKR5F2yWEa6FVEtSWncfVNKW5AKjImAQ6CZlBK9tympdsZJ2xThBiWWA== - -"@msgpackr-extract/msgpackr-extract-linux-x64@3.0.2": - version "3.0.2" - resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-3.0.2.tgz#091b1218b66c341f532611477ef89e83f25fae4f" - integrity sha512-gsWNDCklNy7Ajk0vBBf9jEx04RUxuDQfBse918Ww+Qb9HCPoGzS+XJTLe96iN3BVK7grnLiYghP/M4L8VsaHeA== - -"@msgpackr-extract/msgpackr-extract-win32-x64@3.0.2": - version "3.0.2" - resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-3.0.2.tgz#0f164b726869f71da3c594171df5ebc1c4b0a407" - integrity sha512-O+6Gs8UeDbyFpbSh2CPEz/UOrrdWPTBYNblZK5CxxLisYt4kGX3Sc+czffFonyjiGSq3jWLwJS/CCJc7tBr4sQ== +"@msgpack/msgpack@^3.0.0-beta2": + version "3.0.0-beta2" + resolved "https://registry.yarnpkg.com/@msgpack/msgpack/-/msgpack-3.0.0-beta2.tgz#5bccee30f84df220b33905e3d8249ba96deca0b7" + integrity sha512-y+l1PNV0XDyY8sM3YtuMLK5vE3/hkfId+Do8pLo/OPxfxuFAUwcGz3oiiUuV46/aBpwTzZ+mRWVMtlSKbradhw== "@nodelib/fs.scandir@2.1.5": version "2.1.5" @@ -2425,7 +2400,7 @@ fflate@^0.6.9, fflate@~0.6.10: resolved "https://registry.yarnpkg.com/fflate/-/fflate-0.6.10.tgz#5f40f9659205936a2d18abf88b2e7781662b6d43" integrity sha512-IQrh3lEPM93wVCEczc9SaAOvkmcoQn/G8Bo1e8ZPlY3X3bnAxWaBdvTdvM1hP62iZp0BXWDy4vTAy4fF0+Dlpg== -fflate@~0.8.2: +fflate@^0.8.2, fflate@~0.8.2: version "0.8.2" resolved "https://registry.yarnpkg.com/fflate/-/fflate-0.8.2.tgz#fc8631f5347812ad6028bbe4a2308b2792aa1dea" integrity sha512-cPJU47OaAoCbg0pBvzsgpTPhmhqI5eJjh/JIu8tPj5q+T7iLvW/JAYUqmE7KOB4R1ZyEhzBaIQpQpardBF5z8A== @@ -3825,27 +3800,6 @@ ms@2.1.2: resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.2.tgz#d09d1f357b443f493382a8eb3ccd183872ae6009" integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w== -msgpackr-extract@^3.0.2: - version "3.0.2" - resolved "https://registry.yarnpkg.com/msgpackr-extract/-/msgpackr-extract-3.0.2.tgz#e05ec1bb4453ddf020551bcd5daaf0092a2c279d" - integrity sha512-SdzXp4kD/Qf8agZ9+iTu6eql0m3kWm1A2y1hkpTeVNENutaB0BwHlSvAIaMxwntmRUAUjon2V4L8Z/njd0Ct8A== - dependencies: - node-gyp-build-optional-packages "5.0.7" - optionalDependencies: - "@msgpackr-extract/msgpackr-extract-darwin-arm64" "3.0.2" - "@msgpackr-extract/msgpackr-extract-darwin-x64" "3.0.2" - "@msgpackr-extract/msgpackr-extract-linux-arm" "3.0.2" - "@msgpackr-extract/msgpackr-extract-linux-arm64" "3.0.2" - "@msgpackr-extract/msgpackr-extract-linux-x64" "3.0.2" - "@msgpackr-extract/msgpackr-extract-win32-x64" "3.0.2" - -msgpackr@^1.10.2: - version "1.10.2" - resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-1.10.2.tgz#a73de4767f76659e8c69cf9c80fdfce83937a44a" - integrity sha512-L60rsPynBvNE+8BWipKKZ9jHcSGbtyJYIwjRq0VrIvQ08cRjntGXJYW/tmciZ2IHWIY8WEW32Qa2xbh5+SKBZA== - optionalDependencies: - msgpackr-extract "^3.0.2" - nanoid@^3.3.7: version "3.3.7" resolved "https://registry.yarnpkg.com/nanoid/-/nanoid-3.3.7.tgz#d0c301a691bc8d54efa0a2226ccf3fe2fd656bd8" @@ -3864,11 +3818,6 @@ no-case@^3.0.4: lower-case "^2.0.2" tslib "^2.0.3" -node-gyp-build-optional-packages@5.0.7: - version "5.0.7" - resolved "https://registry.yarnpkg.com/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.0.7.tgz#5d2632bbde0ab2f6e22f1bbac2199b07244ae0b3" - integrity sha512-YlCCc6Wffkx0kHkmam79GKvDQ6x+QZkMjFGrIMxgFNILFvGSbCp2fCBC55pGTT9gVaz8Na5CLmxt/urtzRv36w== - node-releases@^2.0.14: version "2.0.14" resolved "https://registry.yarnpkg.com/node-releases/-/node-releases-2.0.14.tgz#2ffb053bceb8b2be8495ece1ab6ce600c4461b0b" diff --git a/src/viser/infra/_infra.py b/src/viser/infra/_infra.py index dcee09dd..c2b8e2dd 100644 --- a/src/viser/infra/_infra.py +++ b/src/viser/infra/_infra.py @@ -14,7 +14,7 @@ from pathlib import Path from typing import Any, Callable, Generator, NewType, TypeVar -import msgpack +import msgspec import rich import websockets.connection import websockets.datastructures @@ -74,7 +74,7 @@ def set_loop_start(self) -> None: def end_and_serialize(self) -> bytes: """End the recording and serialize contents. Returns the recording as bytes, which should generally be written to a file.""" - packed_bytes = msgpack.packb( + packed_bytes = msgspec.msgpack.encode( { "loopStartIndex": self._loop_start_index, "durationSeconds": self._time, @@ -497,14 +497,14 @@ async def _message_producer( while not buffer.done: outgoing = await window_generator.__anext__() if client_api_version == 1: - serialized = msgpack.packb( + serialized = msgspec.msgpack.encode( tuple(message.as_serializable_dict() for message in outgoing) ) assert isinstance(serialized, bytes) await websocket.send(serialized) elif client_api_version == 0: for msg in outgoing: - serialized = msgpack.packb(msg.as_serializable_dict()) + serialized = msgspec.msgpack.encode(msg.as_serializable_dict()) assert isinstance(serialized, bytes) await websocket.send(serialized) else: diff --git a/src/viser/infra/_messages.py b/src/viser/infra/_messages.py index c02becae..204d66af 100644 --- a/src/viser/infra/_messages.py +++ b/src/viser/infra/_messages.py @@ -8,7 +8,7 @@ import warnings from typing import TYPE_CHECKING, Any, Dict, List, Optional, Type, TypeVar, cast -import msgpack +import msgspec import numpy as onp from typing_extensions import get_args, get_origin, get_type_hints @@ -127,13 +127,19 @@ def _from_serializable_dict(cls, mapping: Dict[str, Any]) -> Dict[str, Any]: @classmethod def deserialize(cls, message: bytes) -> Message: """Convert bytes into a Python Message object.""" - mapping = msgpack.unpackb(message) + mapping = msgspec.msgpack.decode(message) # msgpack deserializes to lists by default, but all of our annotations use # tuples. - mapping = { - k: tuple(v) if isinstance(v, list) else v for k, v in mapping.items() - } + def lists_to_tuple(obj: Any) -> Any: + if isinstance(obj, list): + return tuple(lists_to_tuple(x) for x in obj) + elif isinstance(obj, dict): + return {k: lists_to_tuple(v) for k, v in obj.items()} + else: + return obj + + mapping = lists_to_tuple(mapping) message_type = cls._subclass_from_type_string()[cast(str, mapping.pop("type"))] message_kwargs = message_type._from_serializable_dict(mapping) return message_type(**message_kwargs)