Skip to content

Commit

Permalink
Merge pull request #1280 from canalplus/misc/stream-events-emitter-class
Browse files Browse the repository at this point in the history
StreamEventsEmitter is now a class
  • Loading branch information
peaBerberian authored Sep 26, 2023
2 parents a89178c + b2cc358 commit 50f12b5
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 84 deletions.
21 changes: 14 additions & 7 deletions src/core/init/media_source_content_initializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ import performInitialSeekAndPlay from "./utils/initial_seek_and_play";
import initializeContentDecryption from "./utils/initialize_content_decryption";
import MediaSourceDurationUpdater from "./utils/media_source_duration_updater";
import RebufferingController from "./utils/rebuffering_controller";
import streamEventsEmitter from "./utils/stream_events_emitter";
import StreamEventsEmitter from "./utils/stream_events_emitter";
import listenToMediaError from "./utils/throw_on_media_error";

/**
Expand Down Expand Up @@ -436,12 +436,19 @@ export default class MediaSourceContentInitializer extends ContentInitializer {
initialPlayPerformed.onUpdate((isPerformed, stopListening) => {
if (isPerformed) {
stopListening();
streamEventsEmitter(manifest,
mediaElement,
playbackObserver,
(evt) => this.trigger("streamEvent", evt),
(evt) => this.trigger("streamEventSkip", evt),
cancelSignal);
const streamEventsEmitter = new StreamEventsEmitter(manifest,
mediaElement,
playbackObserver);
streamEventsEmitter.addEventListener("event", (payload) => {
this.trigger("streamEvent", payload);
}, cancelSignal);
streamEventsEmitter.addEventListener("eventSkip", (payload) => {
this.trigger("streamEventSkip", payload);
}, cancelSignal);
streamEventsEmitter.start();
cancelSignal.register(() => {
streamEventsEmitter.stop();
});
}
}, { clearSignal: cancelSignal, emitCurrentValue: true });

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,24 @@ function refreshScheduledEventsList(
}
}

const element = data.value.element;
const actualData = { type: data.type,
value: { ...data.value, element } };
if (end === undefined) {
const newScheduledEvent = { start,
id,
data,
data: actualData,
publicEvent: { start,
data } };
data: actualData } };
scheduledEvents.push(newScheduledEvent);
} else {
const newScheduledEvent = { start,
end,
id,
data,
data: actualData,
publicEvent: { start,
end,
data } };
data: actualData } };
scheduledEvents.push(newScheduledEvent);
}

Expand Down
188 changes: 115 additions & 73 deletions src/core/init/utils/stream_events_emitter/stream_events_emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

import config from "../../../../config";
import Manifest from "../../../../manifest";
import createSharedReference from "../../../../utils/reference";
import EventEmitter from "../../../../utils/event_emitter";
import createSharedReference, {
ISharedReference,
} from "../../../../utils/reference";
import TaskCanceller, { CancellationSignal } from "../../../../utils/task_canceller";
import { IPlaybackObservation, IReadOnlyPlaybackObserver } from "../../../api";
import refreshScheduledEventsList from "./refresh_scheduled_events_list";
Expand All @@ -27,79 +30,118 @@ import {
IStreamEventPayload,
} from "./types";

interface IStreamEventsEmitterEvent {
event: IPublicStreamEvent | IPublicNonFiniteStreamEvent;
eventSkip: IPublicStreamEvent | IPublicNonFiniteStreamEvent;
}

/**
* Get events from manifest and emit each time an event has to be emitted
* @param {Object} manifest
* @param {HTMLMediaElement} mediaElement
* @param {Object} playbackObserver
* @param {Function} onEvent
* @param {Function} onEventSkip
* @param {Object} cancelSignal
* @returns {Object}
*/
export default function streamEventsEmitter(
manifest : Manifest,
mediaElement : HTMLMediaElement,
playbackObserver : IReadOnlyPlaybackObserver<IPlaybackObservation>,
onEvent : (evt : IPublicStreamEvent | IPublicNonFiniteStreamEvent) => void,
onEventSkip : (evt : IPublicStreamEvent | IPublicNonFiniteStreamEvent) => void,
cancelSignal : CancellationSignal
) : void {
const eventsBeingPlayed =
new WeakMap<IStreamEventPayload|INonFiniteStreamEventPayload, true>();
const scheduledEventsRef = createSharedReference(refreshScheduledEventsList([],
manifest),
cancelSignal);
manifest.addEventListener("manifestUpdate", () => {
const prev = scheduledEventsRef.getValue();
scheduledEventsRef.setValue(refreshScheduledEventsList(prev, manifest));
}, cancelSignal);

let isPollingEvents = false;
let cancelCurrentPolling = new TaskCanceller();
cancelCurrentPolling.linkToSignal(cancelSignal);

scheduledEventsRef.onUpdate(({ length: scheduledEventsLength }) => {
if (scheduledEventsLength === 0) {
if (isPollingEvents) {
cancelCurrentPolling.cancel();
cancelCurrentPolling = new TaskCanceller();
cancelCurrentPolling.linkToSignal(cancelSignal);
isPollingEvents = false;
}
return;
} else if (isPollingEvents) {
export default class StreamEventsEmitter extends EventEmitter<IStreamEventsEmitterEvent> {
private _manifest : Manifest;
private _mediaElement : HTMLMediaElement;
private _playbackObserver : IReadOnlyPlaybackObserver<IPlaybackObservation>;
private _scheduledEventsRef : ISharedReference<
Array<IStreamEventPayload | INonFiniteStreamEventPayload>
>;
private _eventsBeingPlayed: WeakMap<
IStreamEventPayload|INonFiniteStreamEventPayload,
true
>;
private _canceller : TaskCanceller | null;

/**
* @param {Object} manifest
* @param {HTMLMediaElement} mediaElement
* @param {Object} playbackObserver
*/
constructor(
manifest : Manifest,
mediaElement : HTMLMediaElement,
playbackObserver : IReadOnlyPlaybackObserver<IPlaybackObservation>
) {
super();
this._manifest = manifest;
this._mediaElement = mediaElement;
this._playbackObserver = playbackObserver;
this._canceller = null;
this._scheduledEventsRef = createSharedReference<
Array<IStreamEventPayload|INonFiniteStreamEventPayload>
>([]);
this._eventsBeingPlayed =
new WeakMap<IStreamEventPayload|INonFiniteStreamEventPayload, true>();
}

public start(): void {
if (this._canceller !== null) {
return;
}
isPollingEvents = true;
let oldObservation = constructObservation();

const { STREAM_EVENT_EMITTER_POLL_INTERVAL } = config.getCurrent();
const intervalId = setInterval(checkStreamEvents,
STREAM_EVENT_EMITTER_POLL_INTERVAL);
playbackObserver.listen(checkStreamEvents,
{ includeLastObservation: false,
clearSignal: cancelCurrentPolling.signal });

cancelCurrentPolling.signal.register(() => {
clearInterval(intervalId);
});

function checkStreamEvents() {
const newObservation = constructObservation();
emitStreamEvents(scheduledEventsRef.getValue(),
oldObservation,
newObservation,
cancelCurrentPolling.signal);
oldObservation = newObservation;
}
this._canceller = new TaskCanceller();

const cancelSignal = this._canceller.signal;
const playbackObserver = this._playbackObserver;
const mediaElement = this._mediaElement;


let isPollingEvents = false;
let cancelCurrentPolling = new TaskCanceller();
cancelCurrentPolling.linkToSignal(cancelSignal);

this._manifest.addEventListener("manifestUpdate", () => {
const prev = this._scheduledEventsRef.getValue();
this._scheduledEventsRef.setValue(refreshScheduledEventsList(prev, this._manifest));
}, this._canceller.signal);
this._scheduledEventsRef.setValue(refreshScheduledEventsList([], this._manifest));

this._scheduledEventsRef.onUpdate(({ length: scheduledEventsLength }) => {
if (scheduledEventsLength === 0) {
if (isPollingEvents) {
cancelCurrentPolling.cancel();
cancelCurrentPolling = new TaskCanceller();
cancelCurrentPolling.linkToSignal(cancelSignal);
isPollingEvents = false;
}
return;
} else if (isPollingEvents) {
return;
}
isPollingEvents = true;
let oldObservation = constructObservation();
const checkStreamEvents = () => {
const newObservation = constructObservation();
this._emitStreamEvents(this._scheduledEventsRef.getValue(),
oldObservation,
newObservation,
cancelCurrentPolling.signal);
oldObservation = newObservation;
};

const { STREAM_EVENT_EMITTER_POLL_INTERVAL } = config.getCurrent();
const intervalId = setInterval(checkStreamEvents,
STREAM_EVENT_EMITTER_POLL_INTERVAL);
playbackObserver.listen(checkStreamEvents,
{ includeLastObservation: false,
clearSignal: cancelCurrentPolling.signal });

cancelCurrentPolling.signal.register(() => {
clearInterval(intervalId);
});

function constructObservation() {
const isSeeking = playbackObserver.getReference().getValue().seeking;
return { currentTime: mediaElement.currentTime,
isSeeking };
}
}, { emitCurrentValue: true, clearSignal: cancelSignal });
}

function constructObservation() {
const isSeeking = playbackObserver.getReference().getValue().seeking;
return { currentTime: mediaElement.currentTime,
isSeeking };
public stop(): void {
if (this._canceller !== null) {
this._canceller.cancel();
this._canceller = null;
}
}, { emitCurrentValue: true, clearSignal: cancelSignal });
}

/**
* Examine playback situation from playback observations to emit stream events and
Expand All @@ -109,7 +151,7 @@ export default function streamEventsEmitter(
* @param {Object} newObservation
* @param {Object} stopSignal
*/
function emitStreamEvents(
private _emitStreamEvents(
scheduledEvents : Array<IStreamEventPayload|INonFiniteStreamEventPayload>,
oldObservation : { currentTime: number; isSeeking: boolean },
newObservation : { currentTime: number; isSeeking: boolean },
Expand All @@ -125,22 +167,22 @@ export default function streamEventsEmitter(
const start = event.start;
const end = isFiniteStreamEvent(event) ? event.end :
undefined;
const isBeingPlayed = eventsBeingPlayed.has(event);
const isBeingPlayed = this._eventsBeingPlayed.has(event);
if (isBeingPlayed) {
if (start > currentTime ||
(end !== undefined && currentTime >= end)
) {
if (isFiniteStreamEvent(event)) {
eventsToExit.push(event.publicEvent);
}
eventsBeingPlayed.delete(event);
this._eventsBeingPlayed.delete(event);
}
} else if (start <= currentTime &&
end !== undefined &&
currentTime < end) {
eventsToSend.push({ type: "stream-event",
value: event.publicEvent });
eventsBeingPlayed.set(event, true);
this._eventsBeingPlayed.set(event, true);
} else if (previousTime < start &&
currentTime >= (end ?? start)) {
if (isSeeking) {
Expand All @@ -159,9 +201,9 @@ export default function streamEventsEmitter(
if (eventsToSend.length > 0) {
for (const event of eventsToSend) {
if (event.type === "stream-event") {
onEvent(event.value);
this.trigger("event", event.value);
} else {
onEventSkip(event.value);
this.trigger("eventSkip", event.value);
}
if (stopSignal.isCancelled()) {
return;
Expand Down

0 comments on commit 50f12b5

Please sign in to comment.