Skip to content

Commit

Permalink
Automatically flush events waiting to be filtered after timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
matus-tomlein committed Aug 7, 2023
1 parent a5416e9 commit 1f055db
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 56 deletions.
40 changes: 8 additions & 32 deletions plugins/browser-plugin-media/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,8 @@ export function startMediaTracking(
*/
export function endMediaTracking(configuration: { id: string }) {
if (activeMedias[configuration.id]) {
let events = activeMedias[configuration.id].flushAndStop();
activeMedias[configuration.id].flushAndStop();
delete activeMedias[configuration.id];

if (events.length > 0) {
dispatchToTrackersInCollection(Object.keys(_trackers), _trackers, (t) => {
events.forEach((event) => {
t.core.track(buildSelfDescribingEvent(event), event.context, event.timestamp);
});
});
}
}
}

Expand Down Expand Up @@ -714,34 +706,18 @@ function track(
return;
}

const events = mediaTracking.update(mediaEvent, customEvent, player, ad, adBreak);
const trackEvent = (event: EventWithContext) => {
dispatchToTrackersInCollection(trackers, _trackers, (t) => {
t.core.track(buildSelfDescribingEvent(event), (event.context ?? []).concat(context), timestamp);
});
};

mediaTracking.update(trackEvent, mediaEvent, customEvent, player, ad, adBreak);

// Update page activity in order to keep sending page pings if needed
if (mediaTracking.shouldUpdatePageActivity()) {
dispatchToTrackersInCollection(trackers, _trackers, (t) => {
t.updatePageActivity();
});
}

// Processes events with context and timestamp and filters out repeated events
const eventsToTrack: EventWithContext[] = mediaTracking.filterRepeatedEvents(
events.map((event) => {
return {
...event,
context: (event.context ?? []).concat(context),
timestamp,
};
})
);

if (eventsToTrack.length == 0) {
return;
}

// Send all created events to the trackers
dispatchToTrackersInCollection(trackers, _trackers, (t) => {
eventsToTrack.forEach((event) => {
t.core.track(buildSelfDescribingEvent(event), event.context, event.timestamp);
});
});
}
12 changes: 5 additions & 7 deletions plugins/browser-plugin-media/src/mediaTracking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ export class MediaTracking {
/**
* Called when user calls `endMediaTracking()`.
*/
flushAndStop(): EventWithContext[] {
flushAndStop() {
this.pingInterval?.clear();
return this.repeatedEventFilter.flush();
this.repeatedEventFilter.flush();
}

/**
Expand All @@ -97,12 +97,13 @@ export class MediaTracking {
* @returns List of events with entities to track.
*/
update(
trackEvent: (event: EventWithContext) => void,
mediaEvent?: MediaEvent,
customEvent?: SelfDescribingJson,
player?: MediaPlayerUpdate,
ad?: MediaAdUpdate,
adBreak?: MediaPlayerAdBreakUpdate
): EventWithContext[] {
) {
// update state
this.updatePlayer(player);
if (mediaEvent !== undefined) {
Expand Down Expand Up @@ -144,11 +145,8 @@ export class MediaTracking {
if (customEvent !== undefined) {
eventsToTrack.push({ event: customEvent, context: context });
}
return eventsToTrack;
}

filterRepeatedEvents(events: EventWithContext[]): EventWithContext[] {
return this.repeatedEventFilter.filterEventsToTrack(events);
this.repeatedEventFilter.trackFilteredEvents(eventsToTrack, trackEvent);
}

shouldUpdatePageActivity(): boolean {
Expand Down
47 changes: 32 additions & 15 deletions plugins/browser-plugin-media/src/repeatedEventFilter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import { FilterOutRepeatedEvents, MediaEventType, EventWithContext } from './typ
*/
export class RepeatedEventFilter {
private aggregateEventsWithOrder: { [schema: string]: boolean } = {};
private eventsToAggregate: { [schema: string]: EventWithContext[] } = {};
private eventsToAggregate: { [schema: string]: (() => void)[] } = {};
private flushTimeout?: number;
private flushTimeoutMs: number;

constructor(configuration?: FilterOutRepeatedEvents) {
let allFiltersEnabled = configuration === undefined || configuration === true;
Expand All @@ -19,44 +21,59 @@ export class RepeatedEventFilter {
this.aggregateEventsWithOrder[getMediaEventSchema(MediaEventType.VolumeChange)] = false;
}

this.flushTimeoutMs = (typeof configuration === 'object' ? configuration.flushTimeoutMs : undefined) ?? 5000;

Object.keys(this.aggregateEventsWithOrder).forEach((schema) => {
this.eventsToAggregate[schema] = [];
});
}

filterEventsToTrack(events: EventWithContext[]): EventWithContext[] {
let eventsToTrack: EventWithContext[] = [];
trackFilteredEvents(events: EventWithContext[], trackEvent: (event: EventWithContext) => void) {
let startFlushTimeout = false;

events.forEach(({ event, context }) => {
if (this.eventsToAggregate[event.schema] !== undefined) {
this.eventsToAggregate[event.schema].push({ event, context });
startFlushTimeout = true;
this.eventsToAggregate[event.schema].push(() => trackEvent({ event, context }));
} else {
startFlushTimeout = false;
// flush any events waiting
let flushed = this.flush();
if (flushed.length > 0) {
eventsToTrack = eventsToTrack.concat(flushed);
}
this.flush();

eventsToTrack.push({ event, context });
trackEvent({ event, context });
}
});

return eventsToTrack;
if (startFlushTimeout && this.flushTimeout === undefined) {
this.setFlushTimeout();
}
}

flush(): EventWithContext[] {
let flushed: EventWithContext[] = [];
flush() {
this.clearFlushTimeout();

Object.keys(this.eventsToAggregate).forEach((schema) => {
let eventsToAggregate = this.eventsToAggregate[schema];
if (eventsToAggregate.length > 0) {
if (this.aggregateEventsWithOrder[schema]) {
flushed.push(eventsToAggregate[0]);
eventsToAggregate[0]();
} else {
flushed.push(eventsToAggregate[eventsToAggregate.length - 1]);
eventsToAggregate[eventsToAggregate.length - 1]();
}
this.eventsToAggregate[schema] = [];
}
});
return flushed;
}

private clearFlushTimeout() {
if (this.flushTimeout !== undefined) {
clearTimeout(this.flushTimeout);
this.flushTimeout = undefined;
}
}

private setFlushTimeout() {
this.clearFlushTimeout();
this.flushTimeout = window.setTimeout(() => this.flush(), this.flushTimeoutMs);
}
}
8 changes: 6 additions & 2 deletions plugins/browser-plugin-media/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,14 @@ export type FilterOutRepeatedEvents =
* Whether to filter out volume change events tracked after each other.
*/
volumeChangeEvents?: boolean;
/**
* Timeout in milliseconds after which to send the events that are queued for filtering.
* Defaults to 5000 ms.
*/
flushTimeoutMs?: number;
}
| boolean;


export type MediaTrackingConfiguration = {
/** Unique ID of the media tracking. The same ID will be used for media player session if enabled. */
id: string;
Expand Down Expand Up @@ -410,4 +414,4 @@ export interface CommonMediaEventProperties extends CommonEventProperties {

export interface EventWithContext extends CommonEventProperties {
event: SelfDescribingJson;
};
}
10 changes: 10 additions & 0 deletions plugins/browser-plugin-media/test/api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,16 @@ describe('Media Tracking API', () => {

expect(eventQueue).toMatchObject([{ context: [{ schema: MEDIA_PLAYER_SCHEMA }, { schema: 'entity' }] }]);
});

it('flushes events that are waiting to be filtered automatically after timeout', (done) => {
startMediaTracking({ id, filterOutRepeatedEvents: { flushTimeoutMs: 0 } });
trackMediaVolumeChange({ id, newVolume: 50 });

setTimeout(() => {
expect(eventQueue.length).toBe(1);
done();
}, 0);
});
});

it('adds custom context entities to all events', () => {
Expand Down

0 comments on commit 1f055db

Please sign in to comment.