-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
By using `webxdc.joinRealtimeChannel()` Reduce data send period, to be precise, the ping will actually be bigger
- Loading branch information
Showing
3 changed files
with
155 additions
and
70 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,13 +5,7 @@ | |
|
||
document.addEventListener('DOMContentLoaded', init); | ||
|
||
// Slightly above the sustained send rate of Delta Chat, so that | ||
// we never send two chunks of data in the same batch, to work around | ||
// `appendBuffer()` throwing if it's not done processing the previous | ||
// chunk. | ||
// Btw, the rate is now 6.6666 for testrun: `*.testrun.org`. | ||
// https://github.com/deltachat/deltachat-core-rust/pull/4904 | ||
const DATA_SEND_PERIOD = 11 * 1000; | ||
const DATA_SEND_PERIOD = 30; | ||
|
||
function init() { | ||
// Keep in mind that the same member could connect from two different devices. | ||
|
@@ -22,47 +16,52 @@ function init() { | |
/** @type {Map<RoomMemberAddr, HTMLElement>} */ | ||
const roomMemberEls = new Map(); | ||
|
||
let handledOldMessages = false; | ||
const handledOldMessagesP = window.webxdc.setUpdateListener(update => { | ||
// Only handle messages that arrived after the app was opened. | ||
// Why? Because it's a prototype. | ||
if (!handledOldMessages) { | ||
return; | ||
} | ||
const realtimeChannel = window.webxdc.joinRealtimeChannel(); | ||
|
||
realtimeChannel.setListener(data => { | ||
// TODO perf: it is more efficient to send video data directly as | ||
// video arrayBuffer instead of converting back and forth to strings | ||
// and back. | ||
const payload = JSON.parse(new TextDecoder().decode(data)); | ||
|
||
switch (update.payload.type) { | ||
switch (payload.type) { | ||
case 'newRoomMember': { | ||
addSectionForMember( | ||
update.payload.roomMemberAddr, | ||
update.payload.roomMemberName, | ||
payload.roomMemberAddr, | ||
payload.roomMemberName, | ||
); | ||
|
||
// Restart the stream, because `appendBuffer` apparently | ||
// doesn't work if previous buffers are dropped. | ||
localStreamP | ||
?.then(stream => stream.stop()) | ||
.then(() => { | ||
// IDK if `setTimeout` is needed. | ||
// setTimeout because of the webxdc.js emulator, see | ||
// "mimic connection establishment time". | ||
// IDK if this is needed in actual connection. | ||
setTimeout(() => { | ||
localStreamP = startBroadcast(includeVideoCheckbox.checked) | ||
}) | ||
localStreamP = startBroadcast( | ||
realtimeChannel, | ||
includeVideoCheckbox.checked | ||
) | ||
}, 500) | ||
}) | ||
|
||
break; | ||
} | ||
case 'newStream': { | ||
let containerElement = roomMemberEls.get(update.payload.roomMemberAddr); | ||
let containerElement = roomMemberEls.get(payload.roomMemberAddr); | ||
if (!containerElement) { | ||
addSectionForMember( | ||
update.payload.roomMemberAddr, | ||
update.payload.roomMemberAddr // Yes, it should be member name. | ||
payload.roomMemberAddr, | ||
payload.roomMemberAddr // Yes, it should be member name. | ||
); | ||
containerElement = roomMemberEls.get(update.payload.roomMemberAddr); | ||
containerElement = roomMemberEls.get(payload.roomMemberAddr); | ||
} | ||
|
||
incomingStreams.set( | ||
update.payload.streamId, | ||
setUpNewVideoDisplay(containerElement, update.payload.mimeType) | ||
payload.streamId, | ||
setUpNewVideoDisplay(containerElement, payload.mimeType) | ||
); | ||
|
||
// Could be `null` if it's not the first time this member started | ||
|
@@ -72,21 +71,32 @@ function init() { | |
break; | ||
} | ||
case 'data': { | ||
const sourceBufferP = incomingStreams.get(update.payload.streamId); | ||
const sourceBufferP = incomingStreams.get(payload.streamId); | ||
// Apparenyly realtimeChannels aren't ordered. | ||
// This once printed 657, 659, 658. | ||
// I guess we need to order messages on our own. Put them in a queue | ||
// But overall it works fine. And there is a test to check if they are | ||
// ordered: | ||
// https://github.com/deltachat/deltachat-core-rust/blob/b5e2ded47a1e8e9ed44275ac6b1009b9f481eba2/deltachat-rpc-client/tests/test_iroh_webxdc.py#L189-L209 | ||
// console.log('received', payload.sequenceNumber); | ||
|
||
sourceBufferP.then(async sourceBuffer => { | ||
// TODO fix: if 'data' events are sent often enough, it can so happen | ||
// that the last `appendBuffer` has not been finished, so this one | ||
// will throw. Need to check `sourceBuffer.updating`. | ||
const deserializedData = await deserializeData(update.payload.data); | ||
sourceBuffer.appendBuffer(deserializedData); | ||
const deserializedData = await deserializeData(payload.data); | ||
execWhenSourceBufferReady( | ||
sourceBuffer, | ||
() => sourceBuffer.appendBuffer(deserializedData), | ||
payload.sequenceNumber | ||
) | ||
}) | ||
break; | ||
} | ||
default: | ||
throw new Error('Unknown message type:' + update.payload.type); | ||
throw new Error('Unknown message type:' + payload.type); | ||
} | ||
}, 0); | ||
handledOldMessagesP.then(() => handledOldMessages = true); | ||
}); | ||
|
||
function addSectionForMember(roomMemberAddr, roomMemberName) { | ||
const memberSection = createElementForRoomMember(roomMemberName); | ||
|
@@ -101,7 +111,7 @@ function init() { | |
startBroadcastButton.addEventListener('click', () => { | ||
startBroadcastButton.disabled = true; | ||
includeVideoCheckbox.disabled = true; | ||
localStreamP = startBroadcast(includeVideoCheckbox.checked) | ||
localStreamP = startBroadcast(realtimeChannel, includeVideoCheckbox.checked) | ||
localStreamP.then(stream => { | ||
stopBroadcastButton.disabled = false; | ||
}); | ||
|
@@ -129,15 +139,19 @@ function init() { | |
} | ||
}) | ||
|
||
handledOldMessagesP.then(() => { | ||
window.webxdc.sendUpdate({ | ||
payload: { | ||
type: 'newRoomMember', | ||
roomMemberName: window.webxdc.selfName, | ||
roomMemberAddr: window.webxdc.selfAddr, | ||
}, | ||
}, ''); | ||
}); | ||
// `setTimeout` because apparently `send()` doesn't work until the | ||
// connection has been established. | ||
// TODO refactor: a proper way to fix this. | ||
setTimeout(() => { | ||
const payload = { | ||
type: 'newRoomMember', | ||
roomMemberName: window.webxdc.selfName, | ||
roomMemberAddr: window.webxdc.selfAddr, | ||
}; | ||
realtimeChannel.send( | ||
(new TextEncoder()).encode(JSON.stringify(payload)) | ||
); | ||
}, 1000) | ||
} | ||
|
||
function createElementForRoomMember(roomMemberName) { | ||
|
@@ -159,32 +173,40 @@ function createElementForRoomMember(roomMemberName) { | |
/** | ||
* @param {boolean} includeVideo | ||
*/ | ||
async function startBroadcast(includeVideo) { | ||
async function startBroadcast(realtimeChannel, includeVideo) { | ||
const streamId = Math.random(); | ||
let sequenceNumber = 0; | ||
|
||
const localStream = new LocalCameraMediaStream( | ||
async (event) => { | ||
const serializedData = await serializeData(event); | ||
window.webxdc.sendUpdate({ | ||
payload: { | ||
type: 'data', | ||
streamId, | ||
data: serializedData, | ||
}, | ||
}, ''); | ||
const payload = { | ||
type: 'data', | ||
streamId, | ||
data: serializedData, | ||
sequenceNumber: sequenceNumber++, | ||
}; | ||
realtimeChannel.send( | ||
(new TextEncoder()).encode(JSON.stringify(payload)) | ||
); | ||
}, | ||
includeVideo, | ||
); | ||
await localStream.init(); | ||
const payload = { | ||
type: 'newStream', | ||
roomMemberAddr: window.webxdc.selfAddr, | ||
streamId, | ||
mimeType: localStream.recorder.mimeType, | ||
}; | ||
realtimeChannel.send( | ||
(new TextEncoder()).encode(JSON.stringify(payload)) | ||
); | ||
|
||
window.webxdc.sendUpdate({ | ||
payload: { | ||
type: 'newStream', | ||
roomMemberAddr: window.webxdc.selfAddr, | ||
streamId, | ||
mimeType: localStream.recorder.mimeType, | ||
}, | ||
payload: {}, | ||
info: `${window.webxdc.selfName} started a broadcast!`, | ||
}, ''); | ||
}, `${window.webxdc.selfName} started a broadcast!`) | ||
|
||
return localStream; | ||
} | ||
|
@@ -243,6 +265,7 @@ async function setUpNewVideoDisplay(containerElement, mimeType) { | |
}); | ||
}) | ||
const sourceBuffer = mediaSource.addSourceBuffer(mimeType); | ||
console.log('created sourceBuffer', sourceBuffer, sourceBuffer.mode) | ||
|
||
containerElement.appendChild(video); | ||
|
||
|
@@ -313,10 +336,71 @@ class LocalCameraMediaStream { | |
} | ||
} | ||
|
||
/** | ||
* Execute `fn` synchronously when `sourceBuffer.updating` becomes `false`. | ||
* If this function was called several times while `sourceBuffer.updating === true` then | ||
* `fn`s are executed in the same order as this function was called. | ||
* @param {SourceBuffer} sourceBuffer | ||
* @param {() => void} fn | ||
*/ | ||
function execWhenSourceBufferReady(sourceBuffer, fn, _sequence) { | ||
let queue = queueMap.get(sourceBuffer); | ||
if (queue && queue.length > 0) { | ||
console.log('queue not empty', _sequence); | ||
|
||
queue.push(fn); | ||
return; | ||
} | ||
// There is nothing in the queue. | ||
|
||
if (!sourceBuffer.updating) { | ||
// console.log('!sourceBuffer.updating, executing immediately', _sequence); | ||
fn(); | ||
return; | ||
} | ||
// `sourceBuffer.updating === true` and the queue is empty | ||
|
||
if (!queue) { | ||
queue = [fn]; | ||
queueMap.set(sourceBuffer, queue); | ||
} else { | ||
queue.push(fn); | ||
} | ||
emptyQueue(sourceBuffer, queue) | ||
} | ||
/** @type {WeakMap<SourceBuffer, Array<() => void>>} */ | ||
const queueMap = new WeakMap(); | ||
/** | ||
* Assumes the queue is not empty | ||
* @param {SourceBuffer} sourceBuffer | ||
* @param {Array<() => void>} queue | ||
*/ | ||
function emptyQueue(sourceBuffer, queue) { | ||
console.log('emptyQueue called'); | ||
|
||
sourceBuffer.addEventListener('updateend', () => { | ||
console.log('updateend'); | ||
|
||
// `fn()` will usually make `sourceBuffer.updating` immediately, so the loop | ||
// will execute once. | ||
while (!sourceBuffer.updating) { | ||
const fn = queue.shift(); | ||
fn(); | ||
if (queue.length === 0) { | ||
return | ||
} | ||
} | ||
// Now `sourceBuffer.updating === true`, let's attach the listener. | ||
/** @type {true} */ | ||
const _assert = sourceBuffer.updating; | ||
emptyQueue(sourceBuffer, queue); | ||
}, { once: true, passive: true }); | ||
} | ||
|
||
|
||
/** | ||
* @license | ||
* Copyright 2023 WofWca <[email protected]> | ||
* Copyright 2023, 2024 WofWca <[email protected]> | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|