Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Commit

Permalink
fix: extend existing polls instead of stacking them (#619)
Browse files Browse the repository at this point in the history
* refactor: use setInterval with array instead of setTimeout with restart

This has disadvantage of not being able to wait for the `request` inside and to guarantee it doesn't take longer than interval.
  • Loading branch information
mattmazzola authored Jun 25, 2018
1 parent a345bc7 commit e2dcd5e
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 37 deletions.
59 changes: 22 additions & 37 deletions src/actions/fetchActions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import {
import { Dispatch } from 'redux'
import * as ClientFactory from '../services/clientFactory'
import { setErrorDisplay } from './displayActions'
import { Poller, IPollConfig } from '../services/poller';
import { delay } from '../util';

// ----------------------------------------
// Train Dialogs
Expand Down Expand Up @@ -175,14 +177,32 @@ const fetchTutorialsFulfilled = (tutorials: AppBase[]): ActionObject => {
// ----------------------------------------
// Training Status
// ----------------------------------------
const delay = <T>(ms: number, value: T = null): Promise<T> => new Promise<T>(resolve => setTimeout(() => resolve(value), ms))
const poller = new Poller({ interval: 2000 })

export const fetchApplicationTrainingStatusThunkAsync = (appId: string) => {
return async (dispatch: Dispatch<any>) => {
dispatch(fetchApplicationTrainingStatusAsync(appId))
// Wait 1 second before polling to ensure service has time to change status from previous to queued / running
await delay(1000)
pollTrainingStatusUntilResolvedOrMaxDuration(dispatch, appId, [TrainingStatusCode.Completed, TrainingStatusCode.Failed], 2000, 30000)

const clClient = ClientFactory.getInstance(AT.FETCH_APPLICATION_TRAININGSTATUS_ASYNC)
const pollConfig: IPollConfig<TrainingStatus> = {
id: appId,
maxDuration: 30000,
request: async () => {
const trainingStatus = await clClient.appGetTrainingStatus(appId)
console.log(`${new Date().getTime()} Poll app: ${appId}: `, trainingStatus.trainingStatus)
return trainingStatus
},
isResolved: trainingStatus => [TrainingStatusCode.Completed, TrainingStatusCode.Failed].includes(trainingStatus.trainingStatus),
onExpired: () => {
console.warn(`Polling for app ${appId} exceeded max duration. Stopping`)
dispatch(fetchApplicationTrainingStatusExpired(appId))
},
onUpdate: trainingStatus => dispatch(fetchApplicationTrainingStatusFulfilled(appId, trainingStatus)),
}

poller.addPoll(pollConfig)
}
}

Expand All @@ -208,41 +228,6 @@ const fetchApplicationTrainingStatusExpired = (appId: string): ActionObject => {
}
}

const pollTrainingStatusUntilResolvedOrMaxDuration = (dispatch: Dispatch<any>, appId: string, resolvedStates: TrainingStatusCode[], interval: number, maxDuration: number): Promise<void> => {
const start = new Date()
const end = start.getTime() + maxDuration
const clClient = ClientFactory.getInstance(null)

return new Promise<void>((resolve) => {
const timerId = setInterval(async () => {
// If current time is after max allowed polling duration then resolve
const now = (new Date()).getTime()
if (now >= end) {
console.warn(`Polling exceeded max duration. Stopping`)

if (timerId) {
clearInterval(timerId)
}

dispatch(fetchApplicationTrainingStatusExpired(appId))
resolve()
}

// Get training status and if it's one of the resolved states resolve promise
const trainingStatus = await clClient.appGetTrainingStatus(appId)
console.log(`Poll app: ${appId} training status: `, end, now, trainingStatus.trainingStatus)
dispatch(fetchApplicationTrainingStatusFulfilled(appId, trainingStatus))

if (resolvedStates.includes(trainingStatus.trainingStatus)) {
if (timerId) {
clearInterval(timerId)
}
resolve()
}
}, interval)
})
}

// -------------------------
// Entities
// -------------------------
Expand Down
148 changes: 148 additions & 0 deletions src/services/poller.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/
import * as poller from './poller'
import { delay } from '../util'

describe('Poller', () => {
test('poll should invoke onExpired callback when polling exceeds max duration', async () => {
// Arrange
const onExpiredMock = jest.fn()
const onUpdateMock = jest.fn(trainingStatus => {
})
const pollConfig: poller.IPollConfig<number> = {
id: 'pc1',
maxDuration: 500,
request: async () => {
return 0
},
isResolved: n => false,
onExpired: onExpiredMock,
onUpdate: onUpdateMock
}

const poller1 = new poller.Poller({ interval: 100 })
await poller1.addPoll(pollConfig)

expect(onExpiredMock.mock.calls.length).toBe(1)
expect(onUpdateMock.mock.calls.length).toBeGreaterThan(3)
})

test('poll should invoke request, isResolved, and onUpdate for each interval', async () => {
const requestMock = jest.fn(async () => {
return 0
})
const isResolvedMock = jest.fn(n => false)
const onExpiredMock = jest.fn()
const onUpdateMock = jest.fn(trainingStatus => {
})
const pollConfig: poller.IPollConfig<number> = {
id: 'pc1',
maxDuration: 500,
request: requestMock,
isResolved: isResolvedMock,
onExpired: onExpiredMock,
onUpdate: onUpdateMock
}

const poller1 = new poller.Poller({ interval: 100 })
await poller1.addPoll(pollConfig)

expect(requestMock.mock.calls.length).toBe(4)
expect(isResolvedMock.mock.calls.length).toBe(4)
expect(onUpdateMock.mock.calls.length).toBe(4)
})

test('poll should stop polling after isResolved returns true', async () => {
const onExpiredMock = jest.fn()
const onUpdateMock = jest.fn(trainingStatus => {
})
const pollConfig: poller.IPollConfig<number> = {
id: 'pc1',
maxDuration: 500,
request: async () => {
return 0
},
isResolved: n => true,
onExpired: onExpiredMock,
onUpdate: onUpdateMock
}

const poller1 = new poller.Poller({ interval: 100 })
await poller1.addPoll(pollConfig)

expect(onUpdateMock.mock.calls.length).toBe(1)
})

test('calling poll with same id should extend existing polls', async () => {
const pollConfig1: poller.IPollConfig<number> = {
id: 'pc1',
maxDuration: 400,
request: async () => {
return 0
},
isResolved: n => false,
onExpired: () => {},
onUpdate: () => {}
}

const pollConfig2: poller.IPollConfig<number> = {
id: 'pc1',
maxDuration: 400,
request: async () => {
return 0
},
isResolved: n => false,
onExpired: () => {},
onUpdate: () => {}
}

const now = new Date().getTime()
const poller1 = new poller.Poller({ interval: 100 })
const p1 = poller1.addPoll(pollConfig1)
await delay(200)
poller1.addPoll(pollConfig2)
await p1
const after = new Date().getTime()

// 200 + 400
expect(after - now).toBeGreaterThanOrEqual(600)
})

test('calling poll with different id should NOT extend existing polls', async () => {
const pollConfig1: poller.IPollConfig<number> = {
id: 'pc1',
maxDuration: 400,
request: async () => {
return 0
},
isResolved: n => false,
onExpired: () => {},
onUpdate: () => {}
}

const pollConfig2: poller.IPollConfig<number> = {
id: 'pc2',
maxDuration: 400,
request: async () => {
return 0
},
isResolved: n => false,
onExpired: () => {},
onUpdate: () => {}
}

const poller1 = new poller.Poller({ interval: 100 })

const now = new Date().getTime()
const p1 = poller1.addPoll(pollConfig1)
await delay(200)
poller1.addPoll(pollConfig2)

await p1 // Will still resolve after 400 expiration
const after = new Date().getTime()

expect(after - now).toBeGreaterThan(400)
})
})
94 changes: 94 additions & 0 deletions src/services/poller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

export interface Deferred {
resolve: Function
reject: Function
pollConfig: IPollConfig<any>
}

export interface ActivePoll {
id: string
end: number
deferred: Deferred[]
}

export interface IPollConfig<T> {
id: string
maxDuration: number
request: () => Promise<T>
isResolved: (t: T) => boolean
onExpired: () => void
onUpdate: (t: T) => void
}

export interface IPollerOptions {
interval: number
}

const global = window
export class Poller {
private polls: ActivePoll[] = []
constructor(options: IPollerOptions) {
global.setInterval(async () => await this.poll(), options.interval)
}

addPoll<T>(pollConfig: IPollConfig<T>) {
const { id, maxDuration } = pollConfig
const start = new Date().getTime()
const end = start + maxDuration
const activeApp = this.polls.find(p => p.id === id)

if (activeApp) {
console.log(`Existing polling found for id: ${id} increasing end from ${activeApp.end} to: ${end}`)
activeApp.end = end
const promise = new Promise((resolve, reject) => {
activeApp.deferred.push({ resolve, reject, pollConfig })
})

return promise
}

console.log(`No polling found for id: ${id}. Starting new polling until: ${end}`)
const promise = new Promise((resolve, reject) => {
this.polls.push({
id,
end,
deferred: [{ resolve, reject, pollConfig }]
})
})

return promise
}

private async poll() {
const now = (new Date()).getTime()
// Alternate approach is to split this into three phases: Filter those expired, await all requests, then filter all resolved.
this.polls = (await Promise.all(this.polls.map(async poll => {
const { end } = poll
// If current time is after max allowed polling duration then resolve
if (now >= end) {
poll.deferred.forEach(deferred => {
deferred.pollConfig.onExpired()
deferred.resolve()
})
return undefined
}

// Get training status and if it's one of the resolved states resolve promise
const firstConfig = poll.deferred[0].pollConfig
const result = await firstConfig.request()
firstConfig.onUpdate(result)

// If trainings status is one of resolved states, remove app from polls to discontinue
if (firstConfig.isResolved(result)) {
poll.deferred.forEach(deferred => deferred.resolve())
return undefined
}

return poll
}))).filter(x => x)
}
}
3 changes: 3 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,6 @@ export function createEntityMapFromMemories(entities: models.EntityBase[], memor
export function getDefaultEntityMap(entities: models.EntityBase[]): Map<string, string> {
return entities.reduce((m, e) => m.set(e.entityId, `$${e.entityName}`), new Map<string, string>())
}

export const delay = <T>(ms: number, value: T = null): Promise<T> => new Promise<T>(resolve => setTimeout(() => resolve(value), ms))

0 comments on commit e2dcd5e

Please sign in to comment.