Skip to content

Commit

Permalink
Revisit serialization dependencies, add progress bar for playback (ex…
Browse files Browse the repository at this point in the history
…perimental) (#244)

* Add progress bar for playback, migrate msgpackr => @msgpack/msgpack for (in-browser) stream support

* Migrate msgpack => msgspec on Python side
msgspec claims to be much faster, ~3x for serialization (our main bottleneck): https://jcristharif.com/msgspec/benchmarks.html#messagepack-serialization

* Revert gtag
  • Loading branch information
brentyi committed Jul 16, 2024
1 parent fbcb430 commit 2889a09
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 142 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ classifiers = [
dependencies = [
"websockets>=10.4",
"numpy>=1.0.0",
"msgpack>=1.0.7",
"msgspec>=0.18.6",
"imageio>=2.0.0",
"pyliblzfse>=0.4.1; platform_system!='Windows'",
"scikit-image>=0.18.0",
Expand Down
3 changes: 2 additions & 1 deletion src/viser/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"@mantine/vanilla-extract": "^7.6.2",
"@mdx-js/mdx": "^3.0.1",
"@mdx-js/react": "^3.0.1",
"@msgpack/msgpack": "^3.0.0-beta2",
"@react-three/drei": "^9.64.0",
"@react-three/fiber": "^8.12.0",
"@tabler/icons-react": "^3.1.0",
Expand All @@ -23,11 +24,11 @@
"clsx": "^2.1.0",
"colortranslator": "^4.1.0",
"dayjs": "^1.11.10",
"fflate": "^0.8.2",
"hold-event": "^1.1.0",
"immer": "^10.0.4",
"its-fine": "^1.2.5",
"mantine-react-table": "^2.0.0-beta.0",
"msgpackr": "^1.10.2",
"postcss": "^8.4.38",
"prettier": "^3.0.3",
"react": "^18.2.0",
Expand Down
30 changes: 16 additions & 14 deletions src/viser/client/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -177,34 +177,36 @@ function ViewerRoot() {

return (
<ViewerContext.Provider value={viewer}>
{viewer.messageSource === "websocket" ? (
<WebsocketMessageProducer />
) : null}
{viewer.messageSource === "file_playback" ? (
<PlaybackFromFile fileUrl={playbackPath!} />
) : null}
<ViewerContents />
<ViewerContents>
{viewer.messageSource === "websocket" ? (
<WebsocketMessageProducer />
) : null}
{viewer.messageSource === "file_playback" ? (
<PlaybackFromFile fileUrl={playbackPath!} />
) : null}
</ViewerContents>
</ViewerContext.Provider>
);
}

function ViewerContents() {
function ViewerContents({ children }: { children: React.ReactNode }) {
const viewer = React.useContext(ViewerContext)!;
const dark_mode = viewer.useGui((state) => state.theme.dark_mode);
const darkMode = viewer.useGui((state) => state.theme.dark_mode);
const colors = viewer.useGui((state) => state.theme.colors);
const control_layout = viewer.useGui((state) => state.theme.control_layout);
const controlLayout = viewer.useGui((state) => state.theme.control_layout);
return (
<>
<ColorSchemeScript forceColorScheme={dark_mode ? "dark" : "light"} />
<ColorSchemeScript forceColorScheme={darkMode ? "dark" : "light"} />
<MantineProvider
theme={createTheme({
...theme,
...(colors === null
? {}
: { colors: { custom: colors }, primaryColor: "custom" }),
})}
forceColorScheme={dark_mode ? "dark" : "light"}
forceColorScheme={darkMode ? "dark" : "light"}
>
{ children }
<Notifications
position="top-left"
containerWidth="20em"
Expand Down Expand Up @@ -238,7 +240,7 @@ function ViewerContents() {
>
<Box
style={(theme) => ({
backgroundColor: dark_mode ? theme.colors.dark[9] : "#fff",
backgroundColor: darkMode ? theme.colors.dark[9] : "#fff",
flexGrow: 1,
overflow: "hidden",
height: "100%",
Expand All @@ -258,7 +260,7 @@ function ViewerContents() {
) : null}
</Box>
{viewer.messageSource == "websocket" ? (
<ControlPanel control_layout={control_layout} />
<ControlPanel control_layout={controlLayout} />
) : null}
</Box>
</Box>
Expand Down
166 changes: 113 additions & 53 deletions src/viser/client/src/FilePlayback.tsx
Original file line number Diff line number Diff line change
@@ -1,81 +1,141 @@
import { unpack } from "msgpackr";
import { decodeAsync, decode } from "@msgpack/msgpack";
import { Message } from "./WebsocketMessages";
import React, { useContext, useEffect } from "react";
import { decompress } from "fflate";

import { useContext, useEffect, useState } from "react";
import { ViewerContext } from "./App";
import { Progress, useMantineTheme } from "@mantine/core";

interface SerializedMessages {
loopStartIndex: number | null;
durationSeconds: number;
messages: [number, Message][];
}

async function deserializeMsgpackFile<T>(fileUrl: string): Promise<T> {
// Fetch the file using fetch()
/** Download, decompress, and deserialize a file, which should be serialized
* via msgpack and compressed via gzip. Also takes a hook for status updates. */
async function deserializeGzippedMsgpackFile<T>(
fileUrl: string,
setStatus: (status: { downloaded: number; total: number }) => void,
): Promise<T> {
const response = await fetch(fileUrl);
if (!response.ok) {
throw new Error(`Failed to fetch the file: ${response.statusText}`);
}
return new Promise<T>((resolve) => {
let length = 0;
const buffer: Uint8Array[] = [];
response.body!.pipeThrough(new DecompressionStream("gzip")).pipeTo(
new WritableStream<Uint8Array>({
write(chunk) {
buffer.push(chunk);
length += chunk.length;
},
close() {
const output = new Uint8Array(length);
let offset = 0;
for (const chunk of buffer) {
output.set(chunk, offset);
offset += chunk.length;
}
console.log(output.length);
resolve(unpack(output));
},
abort(err) {
console.error("Stream aborted:", err);
},
}),
);
const gzipTotalLength = parseInt(response.headers.get("Content-Length")!);

if (!DecompressionStream) {
// Implementation without streaming.
console.log(
"DecompressionStream is unavailable. Falling back to approach without streams.",
);
setStatus({ downloaded: gzipTotalLength * 0.0, total: gzipTotalLength });
response.arrayBuffer().then((buffer) => {
// Down downloading.
setStatus({
downloaded: gzipTotalLength * 0.8,
total: gzipTotalLength,
});
decompress(new Uint8Array(buffer), (error, result) => {
// Done decompressing, time to unpack.
setStatus({
downloaded: gzipTotalLength * 0.9,
total: gzipTotalLength,
});
resolve(decode(result) as T);
setStatus({
downloaded: gzipTotalLength,
total: gzipTotalLength,
});
});
});
} else {
// Counters for processed bytes, both before and after compression.
let gzipReceived = 0;

// Stream: fetch -> gzip -> msgpack.
decodeAsync(
response
.body!.pipeThrough(
// Count number of (compressed) bytes.
new TransformStream({
transform(chunk, controller) {
gzipReceived += chunk.length;
setStatus({ downloaded: gzipReceived, total: gzipTotalLength });
controller.enqueue(chunk);
// return new Promise((resolve) => setTimeout(resolve, 100));
},
}),
)
.pipeThrough(new DecompressionStream("gzip")),
).then((val) => resolve(val as T));
}
});
}

export function PlaybackFromFile({ fileUrl }: { fileUrl: string }) {
const viewer = useContext(ViewerContext)!;
const messageQueueRef = viewer.messageQueueRef;

const darkMode = viewer.useGui((state) => state.theme.dark_mode);
const [status, setStatus] = useState({ downloaded: 0.0, total: 0.0 });
const [loaded, setLoaded] = useState(false);
const theme = useMantineTheme();

useEffect(() => {
deserializeMsgpackFile<SerializedMessages>(fileUrl).then((recording) => {
let messageIndex = 0;
deserializeGzippedMsgpackFile<SerializedMessages>(fileUrl, setStatus).then(
(recording) => {
let messageIndex = 0;

function continuePlayback() {
const currentTimeSeconds = recording.messages[messageIndex][0];
while (currentTimeSeconds >= recording.messages[messageIndex][0]) {
messageQueueRef.current.push(recording.messages[messageIndex][1]);
messageIndex += 1;
function continuePlayback() {
setLoaded(true);
const currentTimeSeconds = recording.messages[messageIndex][0];
while (currentTimeSeconds >= recording.messages[messageIndex][0]) {
messageQueueRef.current.push(recording.messages[messageIndex][1]);
messageIndex += 1;

// Either finish playback or loop.
if (messageIndex === recording.messages.length) {
if (recording.loopStartIndex === null) return;
messageIndex = recording.loopStartIndex;
setTimeout(
continuePlayback,
(recording.durationSeconds - currentTimeSeconds) * 1000.0,
);
return;
// Either finish playback or loop.
if (messageIndex === recording.messages.length) {
if (recording.loopStartIndex === null) return;
messageIndex = recording.loopStartIndex;
setTimeout(
continuePlayback,
(recording.durationSeconds - currentTimeSeconds) * 1000.0,
);
return;
}
}

// Handle next set of frames.
setTimeout(
continuePlayback,
(recording.messages[messageIndex][0] - currentTimeSeconds) * 1000.0,
);
}
setTimeout(continuePlayback, recording.messages[0][0] * 1000.0);
},
);
}, []);

// Handle next set of frames.
setTimeout(
continuePlayback,
(recording.messages[messageIndex][0] - currentTimeSeconds) * 1000.0,
);
}
setTimeout(continuePlayback, recording.messages[0][0] * 1000.0);
});
});
return <></>;
return (
<div
style={{
position: "fixed",
zIndex: 1000,
top: 0,
bottom: 0,
left: 0,
right: 0,
display: loaded ? "none" : "block",
backgroundColor: darkMode ? theme.colors.dark[9] : "#fff",
}}
>
<Progress
value={(status.downloaded / status.total) * 100.0}
radius={0}
transitionDuration={100}
/>
</div>
);
}
6 changes: 3 additions & 3 deletions src/viser/client/src/WebsocketFunctions.tsx
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { MutableRefObject } from "react";
import * as THREE from "three";
import { Message } from "./WebsocketMessages";
import { pack } from "msgpackr";
import { encode } from "@msgpack/msgpack";

/** Send message over websocket. */
export function sendWebsocketMessage(
websocketRef: MutableRefObject<WebSocket | null>,
message: Message,
) {
if (websocketRef.current === null) return;
websocketRef.current.send(pack(message));
websocketRef.current.send(encode(message));
}

/** Returns a function for sending messages, with automatic throttling. */
Expand All @@ -25,7 +25,7 @@ export function makeThrottledMessageSender(
if (websocketRef.current === null) return;
latestMessage = message;
if (readyToSend) {
websocketRef.current.send(pack(message));
websocketRef.current.send(encode(message));
stale = false;
readyToSend = false;

Expand Down
4 changes: 2 additions & 2 deletions src/viser/client/src/WebsocketInterface.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import AwaitLock from "await-lock";
import { unpack } from "msgpackr";
import { decode } from "@msgpack/msgpack";

import React, { useContext } from "react";

Expand Down Expand Up @@ -55,7 +55,7 @@ export function WebsocketMessageProducer() {
// Reduce websocket backpressure.
const messagePromise = new Promise<Message[]>((resolve) => {
(event.data.arrayBuffer() as Promise<ArrayBuffer>).then((buffer) => {
resolve(unpack(new Uint8Array(buffer)) as Message[]);
resolve(decode(new Uint8Array(buffer)) as Message[]);
});
});

Expand Down
6 changes: 3 additions & 3 deletions src/viser/client/src/components/UploadButton.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Box, Progress } from "@mantine/core";
import { Button } from "@mantine/core";
import React, { useContext } from "react";
import { ViewerContext, ViewerContextContents } from "../App";
import { pack } from "msgpackr";
import { encode } from "@msgpack/msgpack";
import { IconCheck } from "@tabler/icons-react";
import { notifications } from "@mantine/notifications";
import { htmlIconWrapper } from "./ComponentStyles.css";
Expand Down Expand Up @@ -137,8 +137,8 @@ function useFileUpload({
const transferUuid = uuid();
const notificationId = "upload-" + transferUuid;

const send = (message: Parameters<typeof pack>[0]) =>
websocketRef.current?.send(pack(message));
const send = (message: Parameters<typeof encode>[0]) =>
websocketRef.current?.send(encode(message));

// Begin upload by setting initial state
updateUploadState({
Expand Down
Loading

0 comments on commit 2889a09

Please sign in to comment.