Skip to content

Commit

Permalink
AJS Retry improvements for 500 and 429, normal and batch (#1084)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelGHSeg authored Sep 17, 2024
1 parent 87811dc commit 5647624
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 16 deletions.
6 changes: 6 additions & 0 deletions .changeset/flat-dryers-wink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@segment/analytics-next': minor
'@segment/analytics-core': minor
---

Adding support for 429 response from the server
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ jest.mock('unfetch', () => {
return fetch
})

import { createSuccess } from '../../../test-helpers/factories'
import batch from '../batched-dispatcher'

const fatEvent = {
Expand Down Expand Up @@ -52,6 +53,7 @@ describe('Batching', () => {
jest.useFakeTimers({
now: new Date('9 Jun 1993 00:00:00Z').getTime(),
})
fetch.mockReturnValue(createSuccess({}))
})

afterEach(() => {
Expand Down
135 changes: 133 additions & 2 deletions packages/browser/src/plugins/segmentio/__tests__/retries.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
const fetch = jest.fn()
jest.mock('unfetch', () => {
return fetch
})

import { segmentio, SegmentioSettings } from '..'
import { Analytics } from '../../../core/analytics'
// @ts-ignore isOffline mocked dependency is accused as unused
Expand All @@ -8,11 +13,135 @@ import { scheduleFlush } from '../schedule-flush'
import * as PPQ from '../../../lib/priority-queue/persisted'
import * as PQ from '../../../lib/priority-queue'
import { Context } from '../../../core/context'
import { createError, createSuccess } from '../../../test-helpers/factories'

jest.mock('../schedule-flush')
//jest.mock('../schedule-flush')

type QueueType = 'priority' | 'persisted'

describe('Segment.io retries 500s and 429', () => {
let options: SegmentioSettings
let analytics: Analytics
let segment: Plugin
beforeEach(async () => {
jest.useRealTimers()
jest.resetAllMocks()
jest.restoreAllMocks()

options = { apiKey: 'foo' }
analytics = new Analytics(
{ writeKey: options.apiKey },
{
retryQueue: true,
}
)
segment = await segmentio(analytics, options, {})
await analytics.register(segment, envEnrichment)
})

test('retries on 500', async () => {
jest.useFakeTimers({ advanceTimers: true })
fetch.mockReturnValue(createError({ status: 500 }))
// .mockReturnValue(createSuccess({}))
const ctx = await analytics.track('event')
jest.runAllTimers()

expect(ctx.attempts).toBeGreaterThanOrEqual(3) // Gets incremented after use
expect(fetch.mock.calls.length).toBeGreaterThanOrEqual(2)
expect(fetch.mock.lastCall[1].body).toContain('"retryCount":')
})

test('delays retry on 429', async () => {
const headers = new Headers()
const resetTime = 1234
headers.set('x-ratelimit-reset', resetTime.toString())
fetch
.mockReturnValueOnce(
createError({
status: 429,
statusText: 'Too Many Requests',
headers: headers,
})
)
.mockReturnValue(createSuccess({}))
const spy = jest.spyOn(PQ.PriorityQueue.prototype, 'pushWithBackoff')
await analytics.track('event')
expect(spy).toHaveBeenLastCalledWith(expect.anything(), resetTime * 1000)
})
})

describe('Batches retry 500s and 429', () => {
let options: SegmentioSettings
let analytics: Analytics
let segment: Plugin
beforeEach(async () => {
jest.useRealTimers()
jest.resetAllMocks()
jest.restoreAllMocks()

options = {
apiKey: 'foo',
deliveryStrategy: {
strategy: 'batching',
// timeout is set very low to get consistent behavior out of scheduleflush
config: { size: 3, timeout: 1, maxRetries: 2 },
},
}
analytics = new Analytics(
{ writeKey: options.apiKey },
{
retryQueue: true,
}
)
segment = await segmentio(analytics, options, {})
await analytics.register(segment, envEnrichment)
})

test('retries on 500', async () => {
fetch
.mockReturnValueOnce(createError({ status: 500 }))
.mockReturnValue(createSuccess({}))

await analytics.track('event1')
const ctx = await analytics.track('event2')
// wait a bit for retries - timeout is only 1 ms
await new Promise((resolve) => setTimeout(resolve, 100))

expect(ctx.attempts).toBe(2)
expect(analytics.queue.queue.getAttempts(ctx)).toBe(1)
expect(fetch).toHaveBeenCalledTimes(2)
})

test('delays retry on 429', async () => {
const headers = new Headers()
const resetTime = 1
headers.set('x-ratelimit-reset', resetTime.toString())
fetch.mockReturnValue(
createError({
status: 429,
statusText: 'Too Many Requests',
headers: headers,
})
)

await analytics.track('event1')
const ctx = await analytics.track('event2')

await new Promise((resolve) => setTimeout(resolve, 100))

expect(ctx.attempts).toBe(2)
expect(fetch).toHaveBeenCalledTimes(1)
await new Promise((resolve) => setTimeout(resolve, 1000))
expect(fetch).toHaveBeenCalledTimes(2)
await new Promise((resolve) => setTimeout(resolve, 1000))
expect(fetch).toHaveBeenCalledTimes(3)
await new Promise((resolve) => setTimeout(resolve, 1000))
expect(fetch).toHaveBeenCalledTimes(3) // capped at 2 retries (+ intial attempt)
// Check the metadata about retry count
expect(fetch.mock.lastCall[1].body).toContain('"retryCount":2')
})
})

describe('Segment.io retries', () => {
let options: SegmentioSettings
let analytics: Analytics
Expand All @@ -23,11 +152,14 @@ describe('Segment.io retries', () => {
;[false, true].forEach((persistenceIsDisabled) => {
describe(`disableClientPersistence: ${persistenceIsDisabled}`, () => {
beforeEach(async () => {
jest.useRealTimers()
jest.resetAllMocks()
jest.restoreAllMocks()

// @ts-expect-error reassign import
isOffline = jest.fn().mockImplementation(() => true)
// @ts-expect-error reassign import
scheduleFlush = jest.fn().mockImplementation(() => {})

options = { apiKey: 'foo' }
analytics = new Analytics(
Expand Down Expand Up @@ -58,7 +190,6 @@ describe('Segment.io retries', () => {
}

segment = await segmentio(analytics, options, {})

await analytics.register(segment, envEnrichment)
})

Expand Down
57 changes: 50 additions & 7 deletions packages/browser/src/plugins/segmentio/batched-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { SegmentEvent } from '../../core/events'
import { fetch } from '../../lib/fetch'
import { onPageChange } from '../../lib/on-page-change'
import { SegmentFacade } from '../../lib/to-facade'
import { RateLimitError } from './ratelimit-error'
import { Context } from '../../core/context'

export type BatchingDispatchConfig = {
size?: number
timeout?: number
maxRetries?: number
keepalive?: boolean
}

Expand Down Expand Up @@ -63,6 +67,7 @@ export default function batch(

const limit = config?.size ?? 10
const timeout = config?.timeout ?? 5000
let rateLimitTimeout = 0

function sendBatch(batch: object[]) {
if (batch.length === 0) {
Expand All @@ -88,28 +93,66 @@ export default function batch(
batch: updatedBatch,
sentAt: new Date().toISOString(),
}),
}).then((res) => {
if (res.status >= 500) {
throw new Error(`Bad response from server: ${res.status}`)
}
if (res.status === 429) {
const retryTimeoutStringSecs = res.headers?.get('x-ratelimit-reset')
const retryTimeoutMS =
typeof retryTimeoutStringSecs == 'string'
? parseInt(retryTimeoutStringSecs) * 1000
: timeout
throw new RateLimitError(
`Rate limit exceeded: ${res.status}`,
retryTimeoutMS
)
}
})
}

async function flush(): Promise<unknown> {
async function flush(attempt = 1): Promise<unknown> {
if (buffer.length) {
const batch = buffer
buffer = []
return sendBatch(batch)
return sendBatch(batch)?.catch((error) => {
const ctx = Context.system()
ctx.log('error', 'Error sending batch', error)
if (attempt <= (config?.maxRetries ?? 10)) {
if (error.name === 'RateLimitError') {
rateLimitTimeout = error.retryTimeout
}
buffer.push(...batch)
buffer.map((event) => {
if ('_metadata' in event) {
const segmentEvent = event as ReturnType<SegmentFacade['json']>
segmentEvent._metadata = {
...segmentEvent._metadata,
retryCount: attempt,
}
}
})
scheduleFlush(attempt + 1)
}
})
}
}

let schedule: NodeJS.Timeout | undefined

function scheduleFlush(): void {
function scheduleFlush(attempt = 1): void {
if (schedule) {
return
}

schedule = setTimeout(() => {
schedule = undefined
flush().catch(console.error)
}, timeout)
schedule = setTimeout(
() => {
schedule = undefined
flush(attempt).catch(console.error)
},
rateLimitTimeout ? rateLimitTimeout : timeout
)
rateLimitTimeout = 0
}

onPageChange((unloaded) => {
Expand Down
15 changes: 15 additions & 0 deletions packages/browser/src/plugins/segmentio/fetch-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { fetch } from '../../lib/fetch'
import { RateLimitError } from './ratelimit-error'

export type Dispatcher = (url: string, body: object) => Promise<unknown>

Expand All @@ -15,6 +16,20 @@ export default function (config?: StandardDispatcherConfig): {
headers: { 'Content-Type': 'text/plain' },
method: 'post',
body: JSON.stringify(body),
}).then((res) => {
if (res.status >= 500) {
throw new Error(`Bad response from server: ${res.status}`)
}
if (res.status === 429) {
const retryTimeoutStringSecs = res.headers?.get('x-ratelimit-reset')
const retryTimeoutMS = retryTimeoutStringSecs
? parseInt(retryTimeoutStringSecs) * 1000
: 5000
throw new RateLimitError(
`Rate limit exceeded: ${res.status}`,
retryTimeoutMS
)
}
})
}

Expand Down
12 changes: 9 additions & 3 deletions packages/browser/src/plugins/segmentio/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,17 @@ export function segmentio(
return client
.dispatch(
`${remote}/${path}`,
normalize(analytics, json, settings, integrations)
normalize(analytics, json, settings, integrations, ctx)
)
.then(() => ctx)
.catch(() => {
buffer.pushWithBackoff(ctx)
.catch((error) => {
ctx.log('error', 'Error sending event', error)
if (error.name === 'RateLimitError') {
const timeout = error.retryTimeout
buffer.pushWithBackoff(ctx, timeout)
} else {
buffer.pushWithBackoff(ctx)
}
// eslint-disable-next-line @typescript-eslint/no-use-before-define
scheduleFlush(flushing, buffer, segmentio, scheduleFlush)
return ctx
Expand Down
14 changes: 13 additions & 1 deletion packages/browser/src/plugins/segmentio/normalize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ import { Analytics } from '../../core/analytics'
import { CDNSettings } from '../../browser'
import { SegmentFacade } from '../../lib/to-facade'
import { SegmentioSettings } from './index'
import { Context } from '../../core/context'

export function normalize(
analytics: Analytics,
json: ReturnType<SegmentFacade['json']>,
settings?: SegmentioSettings,
integrations?: CDNSettings['integrations']
integrations?: CDNSettings['integrations'],
ctx?: Context
): object {
const user = analytics.user()

Expand All @@ -25,6 +27,16 @@ export function normalize(
json._metadata = { failedInitializations: failed }
}

if (ctx != null) {
if (ctx.attempts > 1) {
json._metadata = {
...json._metadata,
retryCount: ctx.attempts,
}
}
ctx.attempts++
}

const bundled: string[] = []
const unbundled: string[] = []

Expand Down
9 changes: 9 additions & 0 deletions packages/browser/src/plugins/segmentio/ratelimit-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export class RateLimitError extends Error {
retryTimeout: number

constructor(message: string, retryTimeout: number) {
super(message)
this.retryTimeout = retryTimeout
this.name = 'RateLimitError'
}
}
Loading

0 comments on commit 5647624

Please sign in to comment.