diff --git a/src/actions/fetchActions.ts b/src/actions/fetchActions.ts index 273b3037..b88236c4 100644 --- a/src/actions/fetchActions.ts +++ b/src/actions/fetchActions.ts @@ -22,6 +22,8 @@ import { import { Dispatch } from 'redux' import * as ClientFactory from '../services/clientFactory' import { setErrorDisplay } from './displayActions' +import { Poller, IPollConfig } from '../services/poller'; +import { delay } from '../util'; // ---------------------------------------- // Train Dialogs @@ -175,14 +177,32 @@ const fetchTutorialsFulfilled = (tutorials: AppBase[]): ActionObject => { // ---------------------------------------- // Training Status // ---------------------------------------- -const delay = (ms: number, value: T = null): Promise => new Promise(resolve => setTimeout(() => resolve(value), ms)) +const poller = new Poller({ interval: 2000 }) export const fetchApplicationTrainingStatusThunkAsync = (appId: string) => { return async (dispatch: Dispatch) => { dispatch(fetchApplicationTrainingStatusAsync(appId)) // Wait 1 second before polling to ensure service has time to change status from previous to queued / running await delay(1000) - pollTrainingStatusUntilResolvedOrMaxDuration(dispatch, appId, [TrainingStatusCode.Completed, TrainingStatusCode.Failed], 2000, 30000) + + const clClient = ClientFactory.getInstance(AT.FETCH_APPLICATION_TRAININGSTATUS_ASYNC) + const pollConfig: IPollConfig = { + id: appId, + maxDuration: 30000, + request: async () => { + const trainingStatus = await clClient.appGetTrainingStatus(appId) + console.log(`${new Date().getTime()} Poll app: ${appId}: `, trainingStatus.trainingStatus) + return trainingStatus + }, + isResolved: trainingStatus => [TrainingStatusCode.Completed, TrainingStatusCode.Failed].includes(trainingStatus.trainingStatus), + onExpired: () => { + console.warn(`Polling for app ${appId} exceeded max duration. Stopping`) + dispatch(fetchApplicationTrainingStatusExpired(appId)) + }, + onUpdate: trainingStatus => dispatch(fetchApplicationTrainingStatusFulfilled(appId, trainingStatus)), + } + + poller.addPoll(pollConfig) } } @@ -208,41 +228,6 @@ const fetchApplicationTrainingStatusExpired = (appId: string): ActionObject => { } } -const pollTrainingStatusUntilResolvedOrMaxDuration = (dispatch: Dispatch, appId: string, resolvedStates: TrainingStatusCode[], interval: number, maxDuration: number): Promise => { - const start = new Date() - const end = start.getTime() + maxDuration - const clClient = ClientFactory.getInstance(null) - - return new Promise((resolve) => { - const timerId = setInterval(async () => { - // If current time is after max allowed polling duration then resolve - const now = (new Date()).getTime() - if (now >= end) { - console.warn(`Polling exceeded max duration. Stopping`) - - if (timerId) { - clearInterval(timerId) - } - - dispatch(fetchApplicationTrainingStatusExpired(appId)) - resolve() - } - - // Get training status and if it's one of the resolved states resolve promise - const trainingStatus = await clClient.appGetTrainingStatus(appId) - console.log(`Poll app: ${appId} training status: `, end, now, trainingStatus.trainingStatus) - dispatch(fetchApplicationTrainingStatusFulfilled(appId, trainingStatus)) - - if (resolvedStates.includes(trainingStatus.trainingStatus)) { - if (timerId) { - clearInterval(timerId) - } - resolve() - } - }, interval) - }) -} - // ------------------------- // Entities // ------------------------- diff --git a/src/services/poller.test.ts b/src/services/poller.test.ts new file mode 100644 index 00000000..56b70a9c --- /dev/null +++ b/src/services/poller.test.ts @@ -0,0 +1,148 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ +import * as poller from './poller' +import { delay } from '../util' + +describe('Poller', () => { + test('poll should invoke onExpired callback when polling exceeds max duration', async () => { + // Arrange + const onExpiredMock = jest.fn() + const onUpdateMock = jest.fn(trainingStatus => { + }) + const pollConfig: poller.IPollConfig = { + id: 'pc1', + maxDuration: 500, + request: async () => { + return 0 + }, + isResolved: n => false, + onExpired: onExpiredMock, + onUpdate: onUpdateMock + } + + const poller1 = new poller.Poller({ interval: 100 }) + await poller1.addPoll(pollConfig) + + expect(onExpiredMock.mock.calls.length).toBe(1) + expect(onUpdateMock.mock.calls.length).toBeGreaterThan(3) + }) + + test('poll should invoke request, isResolved, and onUpdate for each interval', async () => { + const requestMock = jest.fn(async () => { + return 0 + }) + const isResolvedMock = jest.fn(n => false) + const onExpiredMock = jest.fn() + const onUpdateMock = jest.fn(trainingStatus => { + }) + const pollConfig: poller.IPollConfig = { + id: 'pc1', + maxDuration: 500, + request: requestMock, + isResolved: isResolvedMock, + onExpired: onExpiredMock, + onUpdate: onUpdateMock + } + + const poller1 = new poller.Poller({ interval: 100 }) + await poller1.addPoll(pollConfig) + + expect(requestMock.mock.calls.length).toBe(4) + expect(isResolvedMock.mock.calls.length).toBe(4) + expect(onUpdateMock.mock.calls.length).toBe(4) + }) + + test('poll should stop polling after isResolved returns true', async () => { + const onExpiredMock = jest.fn() + const onUpdateMock = jest.fn(trainingStatus => { + }) + const pollConfig: poller.IPollConfig = { + id: 'pc1', + maxDuration: 500, + request: async () => { + return 0 + }, + isResolved: n => true, + onExpired: onExpiredMock, + onUpdate: onUpdateMock + } + + const poller1 = new poller.Poller({ interval: 100 }) + await poller1.addPoll(pollConfig) + + expect(onUpdateMock.mock.calls.length).toBe(1) + }) + + test('calling poll with same id should extend existing polls', async () => { + const pollConfig1: poller.IPollConfig = { + id: 'pc1', + maxDuration: 400, + request: async () => { + return 0 + }, + isResolved: n => false, + onExpired: () => {}, + onUpdate: () => {} + } + + const pollConfig2: poller.IPollConfig = { + id: 'pc1', + maxDuration: 400, + request: async () => { + return 0 + }, + isResolved: n => false, + onExpired: () => {}, + onUpdate: () => {} + } + + const now = new Date().getTime() + const poller1 = new poller.Poller({ interval: 100 }) + const p1 = poller1.addPoll(pollConfig1) + await delay(200) + poller1.addPoll(pollConfig2) + await p1 + const after = new Date().getTime() + + // 200 + 400 + expect(after - now).toBeGreaterThanOrEqual(600) + }) + + test('calling poll with different id should NOT extend existing polls', async () => { + const pollConfig1: poller.IPollConfig = { + id: 'pc1', + maxDuration: 400, + request: async () => { + return 0 + }, + isResolved: n => false, + onExpired: () => {}, + onUpdate: () => {} + } + + const pollConfig2: poller.IPollConfig = { + id: 'pc2', + maxDuration: 400, + request: async () => { + return 0 + }, + isResolved: n => false, + onExpired: () => {}, + onUpdate: () => {} + } + + const poller1 = new poller.Poller({ interval: 100 }) + + const now = new Date().getTime() + const p1 = poller1.addPoll(pollConfig1) + await delay(200) + poller1.addPoll(pollConfig2) + + await p1 // Will still resolve after 400 expiration + const after = new Date().getTime() + + expect(after - now).toBeGreaterThan(400) + }) +}) \ No newline at end of file diff --git a/src/services/poller.ts b/src/services/poller.ts new file mode 100644 index 00000000..68b4561e --- /dev/null +++ b/src/services/poller.ts @@ -0,0 +1,94 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. + */ + +export interface Deferred { + resolve: Function + reject: Function + pollConfig: IPollConfig +} + +export interface ActivePoll { + id: string + end: number + deferred: Deferred[] +} + +export interface IPollConfig { + id: string + maxDuration: number + request: () => Promise + isResolved: (t: T) => boolean + onExpired: () => void + onUpdate: (t: T) => void +} + +export interface IPollerOptions { + interval: number +} + +const global = window +export class Poller { + private polls: ActivePoll[] = [] + constructor(options: IPollerOptions) { + global.setInterval(async () => await this.poll(), options.interval) + } + + addPoll(pollConfig: IPollConfig) { + const { id, maxDuration } = pollConfig + const start = new Date().getTime() + const end = start + maxDuration + const activeApp = this.polls.find(p => p.id === id) + + if (activeApp) { + console.log(`Existing polling found for id: ${id} increasing end from ${activeApp.end} to: ${end}`) + activeApp.end = end + const promise = new Promise((resolve, reject) => { + activeApp.deferred.push({ resolve, reject, pollConfig }) + }) + + return promise + } + + console.log(`No polling found for id: ${id}. Starting new polling until: ${end}`) + const promise = new Promise((resolve, reject) => { + this.polls.push({ + id, + end, + deferred: [{ resolve, reject, pollConfig }] + }) + }) + + return promise + } + + private async poll() { + const now = (new Date()).getTime() + // Alternate approach is to split this into three phases: Filter those expired, await all requests, then filter all resolved. + this.polls = (await Promise.all(this.polls.map(async poll => { + const { end } = poll + // If current time is after max allowed polling duration then resolve + if (now >= end) { + poll.deferred.forEach(deferred => { + deferred.pollConfig.onExpired() + deferred.resolve() + }) + return undefined + } + + // Get training status and if it's one of the resolved states resolve promise + const firstConfig = poll.deferred[0].pollConfig + const result = await firstConfig.request() + firstConfig.onUpdate(result) + + // If trainings status is one of resolved states, remove app from polls to discontinue + if (firstConfig.isResolved(result)) { + poll.deferred.forEach(deferred => deferred.resolve()) + return undefined + } + + return poll + }))).filter(x => x) + } +} \ No newline at end of file diff --git a/src/util.ts b/src/util.ts index c8f8a7c8..de98d42b 100644 --- a/src/util.ts +++ b/src/util.ts @@ -56,3 +56,6 @@ export function createEntityMapFromMemories(entities: models.EntityBase[], memor export function getDefaultEntityMap(entities: models.EntityBase[]): Map { return entities.reduce((m, e) => m.set(e.entityId, `$${e.entityName}`), new Map()) } + +export const delay = (ms: number, value: T = null): Promise => new Promise(resolve => setTimeout(() => resolve(value), ms)) +