diff --git a/packages/node/src/__tests__/callback.test.ts b/packages/node/src/__tests__/callback.test.ts index 426171ec8..c9aa6d80f 100644 --- a/packages/node/src/__tests__/callback.test.ts +++ b/packages/node/src/__tests__/callback.test.ts @@ -1,17 +1,11 @@ -const fetcher = jest.fn() -jest.mock('../lib/fetch', () => ({ fetch: fetcher })) - -import { createError, createSuccess } from './test-helpers/factories' import { createTestAnalytics } from './test-helpers/create-test-analytics' import { Context } from '../app/context' describe('Callback behavior', () => { - beforeEach(() => { - fetcher.mockReturnValue(createSuccess()) - }) - it('should handle success', async () => { - const ajs = createTestAnalytics({ maxEventsInBatch: 1 }) + const ajs = createTestAnalytics({ + maxEventsInBatch: 1, + }) const ctx = await new Promise((resolve, reject) => ajs.track( { @@ -29,8 +23,12 @@ describe('Callback behavior', () => { }) it('should handle errors', async () => { - fetcher.mockReturnValue(createError()) - const ajs = createTestAnalytics({ maxEventsInBatch: 1 }) + const ajs = createTestAnalytics( + { + maxEventsInBatch: 1, + }, + { withError: true } + ) const [err, ctx] = await new Promise<[any, Context]>((resolve) => ajs.track( { diff --git a/packages/node/src/__tests__/disable.integration.test.ts b/packages/node/src/__tests__/disable.integration.test.ts index 730170069..34bef2dd7 100644 --- a/packages/node/src/__tests__/disable.integration.test.ts +++ b/packages/node/src/__tests__/disable.integration.test.ts @@ -1,9 +1,10 @@ -const fetcher = jest.fn() -jest.mock('../lib/fetch', () => ({ fetch: fetcher })) - import { createTestAnalytics } from './test-helpers/create-test-analytics' +import { TestFetchClient } from './test-helpers/test-fetch-client' describe('disable', () => { + const customClient = new TestFetchClient() + const mockSend = jest.spyOn(customClient, 'send') + it('should dispatch callbacks and emit an http request, even if disabled', async () => { const analytics = createTestAnalytics({ disable: true, @@ -19,19 +20,21 @@ describe('disable', () => { it('should call fetch if disabled is false', async () => { const analytics = createTestAnalytics({ disable: false, + httpClient: customClient, }) await new Promise((resolve) => analytics.track({ anonymousId: 'foo', event: 'bar' }, resolve) ) - expect(fetcher).toBeCalled() + expect(mockSend).toBeCalledTimes(1) }) it('should not call fetch if disabled is true', async () => { const analytics = createTestAnalytics({ disable: true, + httpClient: customClient, }) await new Promise((resolve) => analytics.track({ anonymousId: 'foo', event: 'bar' }, resolve) ) - expect(fetcher).not.toBeCalled() + expect(mockSend).toBeCalledTimes(0) }) }) diff --git a/packages/node/src/__tests__/emitter.integration.test.ts b/packages/node/src/__tests__/emitter.integration.test.ts index 0282e29d8..3f86e59ac 100644 --- a/packages/node/src/__tests__/emitter.integration.test.ts +++ b/packages/node/src/__tests__/emitter.integration.test.ts @@ -1,13 +1,8 @@ -const fetcher = jest.fn() -jest.mock('../lib/fetch', () => ({ fetch: fetcher })) - -import { createError, createSuccess } from './test-helpers/factories' import { createTestAnalytics } from './test-helpers/create-test-analytics' import { assertHttpRequestEmittedEvent } from './test-helpers/assert-shape' describe('http_request', () => { it('emits an http_request event if success', async () => { - fetcher.mockReturnValue(createSuccess()) const analytics = createTestAnalytics() const fn = jest.fn() analytics.on('http_request', fn) @@ -19,8 +14,12 @@ describe('http_request', () => { }) it('emits an http_request event if error', async () => { - fetcher.mockReturnValue(createError()) - const analytics = createTestAnalytics({ maxRetries: 0 }) + const analytics = createTestAnalytics( + { + maxRetries: 0, + }, + { withError: true } + ) const fn = jest.fn() analytics.on('http_request', fn) await new Promise((resolve) => @@ -30,8 +29,12 @@ describe('http_request', () => { }) it('if error, emits an http_request event on every retry', async () => { - fetcher.mockReturnValue(createError()) - const analytics = createTestAnalytics({ maxRetries: 2 }) + const analytics = createTestAnalytics( + { + maxRetries: 2, + }, + { withError: true } + ) const fn = jest.fn() analytics.on('http_request', fn) await new Promise((resolve) => diff --git a/packages/node/src/__tests__/graceful-shutdown-integration.test.ts b/packages/node/src/__tests__/graceful-shutdown-integration.test.ts index bd942ab3a..062997706 100644 --- a/packages/node/src/__tests__/graceful-shutdown-integration.test.ts +++ b/packages/node/src/__tests__/graceful-shutdown-integration.test.ts @@ -1,9 +1,5 @@ -import { createSuccess } from './test-helpers/factories' +import { TestFetchClient } from './test-helpers/test-fetch-client' import { performance as perf } from 'perf_hooks' - -const fetcher = jest.fn() -jest.mock('../lib/fetch', () => ({ fetch: fetcher })) - import { Analytics } from '../app/analytics-node' import { sleep } from './test-helpers/sleep' import { Plugin, SegmentEvent } from '../app/types' @@ -17,18 +13,21 @@ const testPlugin: Plugin = { isLoaded: () => true, } +const testClient = new TestFetchClient() +const sendSpy = jest.spyOn(testClient, 'send') + describe('Ability for users to exit without losing events', () => { let ajs!: Analytics beforeEach(async () => { - fetcher.mockReturnValue(createSuccess()) ajs = new Analytics({ writeKey: 'abc123', maxEventsInBatch: 1, + httpClient: testClient, }) }) const _helpers = { - getFetchCalls: (mockedFetchFn = fetcher) => - mockedFetchFn.mock.calls.map(([url, request]) => ({ + getFetchCalls: () => + sendSpy.mock.calls.map(([url, request]) => ({ url, method: request.method, headers: request.headers, @@ -89,6 +88,7 @@ describe('Ability for users to exit without losing events', () => { ajs = new Analytics({ writeKey: 'abc123', flushInterval, + httpClient: testClient, }) const closeAndFlushTimeout = ajs['_closeAndFlushDefaultTimeout'] expect(closeAndFlushTimeout).toBe(flushInterval * 1.25) @@ -190,6 +190,7 @@ describe('Ability for users to exit without losing events', () => { writeKey: 'foo', flushInterval: 10000, maxEventsInBatch: 15, + httpClient: testClient, }) _helpers.makeTrackCall(analytics) _helpers.makeTrackCall(analytics) @@ -220,6 +221,7 @@ describe('Ability for users to exit without losing events', () => { writeKey: 'foo', flushInterval: 10000, maxEventsInBatch: 15, + httpClient: testClient, }) await analytics.register(_testPlugin) _helpers.makeTrackCall(analytics) diff --git a/packages/node/src/__tests__/http-integration.test.ts b/packages/node/src/__tests__/http-integration.test.ts index 1599ba705..aba7d35be 100644 --- a/packages/node/src/__tests__/http-integration.test.ts +++ b/packages/node/src/__tests__/http-integration.test.ts @@ -33,7 +33,7 @@ describe('Method Smoke Tests', () => { let scope: nock.Scope let ajs: Analytics beforeEach(async () => { - ajs = createTestAnalytics() + ajs = createTestAnalytics({}, { useRealHTTPClient: true }) }) describe('Metadata', () => { @@ -333,10 +333,13 @@ describe('Client: requestTimeout', () => { }) it('should timeout immediately if request timeout is set to 0', async () => { jest.useRealTimers() - const ajs = createTestAnalytics({ - maxEventsInBatch: 1, - httpRequestTimeout: 0, - }) + const ajs = createTestAnalytics( + { + maxEventsInBatch: 1, + httpRequestTimeout: 0, + }, + { useRealHTTPClient: true } + ) ajs.track({ event: 'foo', userId: 'foo', properties: { hello: 'world' } }) try { await resolveCtx(ajs, 'track') diff --git a/packages/node/src/__tests__/integration.test.ts b/packages/node/src/__tests__/integration.test.ts index 0bf909bad..d1e4947e9 100644 --- a/packages/node/src/__tests__/integration.test.ts +++ b/packages/node/src/__tests__/integration.test.ts @@ -1,19 +1,16 @@ -const fetcher = jest.fn() -jest.mock('../lib/fetch', () => ({ fetch: fetcher })) - import { Plugin } from '../app/types' import { resolveCtx } from './test-helpers/resolve-ctx' import { testPlugin } from './test-helpers/test-plugin' -import { createSuccess, createError } from './test-helpers/factories' +import { createError } from './test-helpers/factories' import { createTestAnalytics } from './test-helpers/create-test-analytics' +import { TestFetchClient } from './test-helpers/test-fetch-client' const writeKey = 'foo' jest.setTimeout(10000) const timestamp = new Date() -beforeEach(() => { - fetcher.mockReturnValue(createSuccess()) -}) +const testClient = new TestFetchClient() +const sendSpy = jest.spyOn(testClient, 'send') describe('Settings / Configuration Init', () => { it('throws if no writeKey', () => { @@ -28,11 +25,12 @@ describe('Settings / Configuration Init', () => { const analytics = createTestAnalytics({ host: 'http://foo.com', path: '/bar', + httpClient: testClient, }) const track = resolveCtx(analytics, 'track') analytics.track({ event: 'foo', userId: 'sup' }) await track - expect(fetcher.mock.calls[0][0]).toBe('http://foo.com/bar') + expect(sendSpy.mock.calls[0][0]).toBe('http://foo.com/bar') }) it('throws if host / path is bad', async () => { @@ -53,10 +51,14 @@ describe('Error handling', () => { }) it('should emit on an error', async () => { - const analytics = createTestAnalytics({ maxRetries: 0 }) - fetcher.mockReturnValue( - createError({ statusText: 'Service Unavailable', status: 503 }) - ) + const err = createError({ + statusText: 'Service Unavailable', + status: 503, + }) + const analytics = createTestAnalytics({ + maxRetries: 0, + httpClient: new TestFetchClient({ response: err }), + }) try { const promise = resolveCtx(analytics, 'track') analytics.track({ event: 'foo', userId: 'sup' }) diff --git a/packages/node/src/__tests__/plugins.test.ts b/packages/node/src/__tests__/plugins.test.ts index 4efa6b95e..18a3ef8c9 100644 --- a/packages/node/src/__tests__/plugins.test.ts +++ b/packages/node/src/__tests__/plugins.test.ts @@ -1,14 +1,6 @@ -const fetcher = jest.fn() -jest.mock('../lib/fetch', () => ({ fetch: fetcher })) - -import { createSuccess } from './test-helpers/factories' import { createTestAnalytics } from './test-helpers/create-test-analytics' describe('Plugins', () => { - beforeEach(() => { - fetcher.mockReturnValue(createSuccess()) - }) - describe('Initialize', () => { it('loads analytics-node-next plugin', async () => { const analytics = createTestAnalytics() diff --git a/packages/node/src/__tests__/test-helpers/create-test-analytics.ts b/packages/node/src/__tests__/test-helpers/create-test-analytics.ts index e97e35f5a..fd512a50b 100644 --- a/packages/node/src/__tests__/test-helpers/create-test-analytics.ts +++ b/packages/node/src/__tests__/test-helpers/create-test-analytics.ts @@ -1,8 +1,20 @@ import { Analytics } from '../../app/analytics-node' import { AnalyticsSettings } from '../../app/settings' +import { TestFetchClient, TestFetchClientOptions } from './test-fetch-client' export const createTestAnalytics = ( - settings: Partial = {} + settings: Partial = {}, + { + withError, + useRealHTTPClient, + }: TestFetchClientOptions & { useRealHTTPClient?: boolean } = {} ) => { - return new Analytics({ writeKey: 'foo', flushInterval: 100, ...settings }) + return new Analytics({ + writeKey: 'foo', + flushInterval: 100, + ...(useRealHTTPClient + ? {} + : { httpClient: new TestFetchClient({ withError }) }), + ...settings, + }) } diff --git a/packages/node/src/__tests__/test-helpers/test-fetch-client.ts b/packages/node/src/__tests__/test-helpers/test-fetch-client.ts new file mode 100644 index 000000000..a012ee4ff --- /dev/null +++ b/packages/node/src/__tests__/test-helpers/test-fetch-client.ts @@ -0,0 +1,26 @@ +import { AnalyticsHTTPClient } from '../../lib/http-client' +import { createError, createSuccess } from './factories' + +export type TestFetchClientOptions = { + withError?: boolean + /** override response (if needed) */ + response?: Response | Promise +} +/** + * Test client. + * Try not to use this directly -- use createTestAnalytics instead. + */ +export class TestFetchClient implements AnalyticsHTTPClient { + private withError?: TestFetchClientOptions['withError'] + private response?: TestFetchClientOptions['response'] + constructor({ withError, response }: TestFetchClientOptions = {}) { + this.withError = withError + this.response = response + } + send(..._args: Parameters) { + if (this.response) { + return Promise.resolve(this.response) + } + return Promise.resolve(this.withError ? createError() : createSuccess()) + } +} diff --git a/packages/node/src/app/analytics-node.ts b/packages/node/src/app/analytics-node.ts index 25f7f4d42..c05f85cc2 100644 --- a/packages/node/src/app/analytics-node.ts +++ b/packages/node/src/app/analytics-node.ts @@ -16,6 +16,7 @@ import { } from './types' import { Context } from './context' import { NodeEventQueue } from './event-queue' +import { DefaultHTTPClient } from '../lib/http-client' export class Analytics extends NodeEmitter implements CoreAnalytics { private readonly _eventFactory: NodeEventFactory @@ -51,6 +52,7 @@ export class Analytics extends NodeEmitter implements CoreAnalytics { httpRequestTimeout: settings.httpRequestTimeout, disable: settings.disable, flushInterval, + httpClient: settings.httpClient ?? new DefaultHTTPClient(), }, this as NodeEmitter ) diff --git a/packages/node/src/app/settings.ts b/packages/node/src/app/settings.ts index 8088bc56e..3fb589a49 100644 --- a/packages/node/src/app/settings.ts +++ b/packages/node/src/app/settings.ts @@ -1,4 +1,5 @@ import { ValidationError } from '@segment/analytics-core' +import { AnalyticsHTTPClient } from '../lib/http-client' export interface AnalyticsSettings { /** @@ -34,6 +35,13 @@ export interface AnalyticsSettings { * Disable the analytics library. All calls will be a noop. Default: false. */ disable?: boolean + + /** + * Supply a default http client implementation (such as one supporting proxy) + * Default: an http client that uses that value of globalThis.fetch, or + * node-fetch if it doesn't exist + */ + httpClient?: AnalyticsHTTPClient } export const validateSettings = (settings: AnalyticsSettings) => { diff --git a/packages/node/src/lib/__tests__/abort.test.ts b/packages/node/src/lib/__tests__/abort.test.ts index ecc4af764..a4752eb1e 100644 --- a/packages/node/src/lib/__tests__/abort.test.ts +++ b/packages/node/src/lib/__tests__/abort.test.ts @@ -1,7 +1,7 @@ import { abortSignalAfterTimeout } from '../abort' import nock from 'nock' -import { fetch } from '../fetch' import { sleep } from '@segment/analytics-core' +import { DefaultHTTPClient } from '../http-client' describe(abortSignalAfterTimeout, () => { const HOST = 'https://foo.com' @@ -30,7 +30,8 @@ describe(abortSignalAfterTimeout, () => { try { const [signal] = abortSignalAfterTimeout(2000) jest.advanceTimersByTime(6000) - await fetch(HOST, { signal }) + const client = new DefaultHTTPClient() + await client.send(HOST, { signal }) throw Error('fail test.') } catch (err: any) { expect(err.name).toMatch('AbortError') @@ -42,7 +43,8 @@ describe(abortSignalAfterTimeout, () => { nock(HOST).get('/').reply(201) const [signal] = abortSignalAfterTimeout(0) try { - await fetch(HOST, { signal }) + const client = new DefaultHTTPClient() + await client.send(HOST, { signal }) throw Error('fail test.') } catch (err: any) { expect(err.name).toMatch('AbortError') diff --git a/packages/node/src/lib/fetch.ts b/packages/node/src/lib/fetch.ts deleted file mode 100644 index 35e65d69f..000000000 --- a/packages/node/src/lib/fetch.ts +++ /dev/null @@ -1,14 +0,0 @@ -export const fetch: typeof globalThis.fetch = async (...args) => { - if (globalThis.fetch) { - return globalThis.fetch(...args) - } // @ts-ignore - // This guard causes is important, as it causes dead-code elimination to be enabled inside this block. - else if (typeof EdgeRuntime !== 'string') { - // @ts-ignore - return (await import('node-fetch')).default(...args) as Response - } else { - throw new Error( - 'Invariant: an edge runtime that does not support fetch should not exist' - ) - } -} diff --git a/packages/node/src/lib/http-client.ts b/packages/node/src/lib/http-client.ts new file mode 100644 index 000000000..fdf6520e3 --- /dev/null +++ b/packages/node/src/lib/http-client.ts @@ -0,0 +1,20 @@ +export interface AnalyticsHTTPClient { + send: (resource: any, options: any) => Promise +} + +export class DefaultHTTPClient implements AnalyticsHTTPClient { + async send(resource: any, options: any): Promise { + if (globalThis.fetch) { + return globalThis.fetch(resource, options) + } // @ts-ignore + // This guard causes is important, as it causes dead-code elimination to be enabled inside this block. + else if (typeof EdgeRuntime !== 'string') { + // @ts-ignore + return (await import('node-fetch')).default(payload, options) as Response + } else { + throw new Error( + 'Invariant: an edge runtime that does not support fetch should not exist' + ) + } + } +} diff --git a/packages/node/src/plugins/segmentio/__tests__/methods.test.ts b/packages/node/src/plugins/segmentio/__tests__/methods.test.ts index beedd411f..d7429fb54 100644 --- a/packages/node/src/plugins/segmentio/__tests__/methods.test.ts +++ b/packages/node/src/plugins/segmentio/__tests__/methods.test.ts @@ -1,5 +1,3 @@ -const fetcher = jest.fn() -jest.mock('../../../lib/fetch', () => ({ fetch: fetcher })) import { NodeEventFactory } from '../../../app/event-factory' import { createSuccess } from '../../../__tests__/test-helpers/factories' import { createConfiguredNodePlugin } from '../index' @@ -10,10 +8,24 @@ import { bodyPropertyMatchers, assertSegmentApiBody, } from './test-helpers/segment-http-api' +import { TestFetchClient } from '../../../__tests__/test-helpers/test-fetch-client' let emitter: Emitter -const createTestNodePlugin = (props: PublisherProps) => - createConfiguredNodePlugin(props, emitter) +const testClient = new TestFetchClient() +const fetcher = jest.spyOn(testClient, 'send') + +const createTestNodePlugin = (props: Partial = {}) => + createConfiguredNodePlugin( + { + maxRetries: 3, + maxEventsInBatch: 1, + flushInterval: 1000, + writeKey: '', + httpClient: testClient, + ...props, + }, + emitter + ) const validateFetcherInputs = (...contexts: Context[]) => { const [url, request] = fetcher.mock.lastCall @@ -34,6 +46,7 @@ test('alias', async () => { maxEventsInBatch: 1, flushInterval: 1000, writeKey: '', + httpClient: testClient, }) const event = eventFactory.alias('to', 'from') @@ -58,12 +71,7 @@ test('alias', async () => { }) test('group', async () => { - const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, - maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', - }) + const { plugin: segmentPlugin } = createTestNodePlugin() const event = eventFactory.group( 'foo-group-id', @@ -96,12 +104,7 @@ test('group', async () => { }) test('identify', async () => { - const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, - maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', - }) + const { plugin: segmentPlugin } = createTestNodePlugin() const event = eventFactory.identify('foo-user-id', { name: 'Chris Radek', @@ -128,12 +131,7 @@ test('identify', async () => { }) test('page', async () => { - const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, - maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', - }) + const { plugin: segmentPlugin } = createTestNodePlugin() const event = eventFactory.page( 'Category', @@ -167,12 +165,7 @@ test('page', async () => { }) test('screen', async () => { - const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, - maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', - }) + const { plugin: segmentPlugin } = createTestNodePlugin() const event = eventFactory.screen( 'Category', @@ -205,12 +198,7 @@ test('screen', async () => { }) test('track', async () => { - const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, - maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', - }) + const { plugin: segmentPlugin } = createTestNodePlugin() const event = eventFactory.track( 'test event', diff --git a/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts b/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts index 149d6ccb3..7a2eb9fe3 100644 --- a/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts +++ b/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts @@ -1,5 +1,3 @@ -const fetcher = jest.fn() -jest.mock('../../../lib/fetch', () => ({ fetch: fetcher })) import { Emitter } from '@segment/analytics-core' import { range } from 'lodash' import { createConfiguredNodePlugin } from '..' @@ -10,12 +8,26 @@ import { createSuccess, createError, } from '../../../__tests__/test-helpers/factories' +import { TestFetchClient } from '../../../__tests__/test-helpers/test-fetch-client' import { PublisherProps } from '../publisher' import { assertSegmentApiBody } from './test-helpers/segment-http-api' let emitter: Emitter -const createTestNodePlugin = (props: PublisherProps) => - createConfiguredNodePlugin(props, emitter) +const testClient = new TestFetchClient() +const fetcher = jest.spyOn(testClient, 'send') + +const createTestNodePlugin = (props: Partial = {}) => + createConfiguredNodePlugin( + { + maxEventsInBatch: 1, + httpClient: testClient, + writeKey: '', + flushInterval: 1000, + maxRetries: 3, + ...props, + }, + emitter + ) const validateFetcherInputs = (...contexts: Context[]) => { const [url, request] = fetcher.mock.lastCall @@ -34,8 +46,6 @@ it('supports multiple events in a batch', async () => { const { plugin: segmentPlugin } = createTestNodePlugin({ maxRetries: 3, maxEventsInBatch: 3, - flushInterval: 1000, - writeKey: '', }) // Create 3 events of mixed types to send. @@ -62,8 +72,6 @@ it('supports waiting a max amount of time before sending', async () => { const { plugin: segmentPlugin } = createTestNodePlugin({ maxRetries: 3, maxEventsInBatch: 3, - flushInterval: 1000, - writeKey: '', }) const context = new Context(eventFactory.alias('to', 'from')) @@ -90,8 +98,6 @@ it('sends as soon as batch fills up or max time is reached', async () => { const { plugin: segmentPlugin } = createTestNodePlugin({ maxRetries: 3, maxEventsInBatch: 2, - flushInterval: 1000, - writeKey: '', }) const context = new Context(eventFactory.alias('to', 'from')) @@ -127,7 +133,6 @@ it('sends if batch will exceed max size in bytes when adding event', async () => maxRetries: 3, maxEventsInBatch: 20, flushInterval: 100, - writeKey: '', }) const contexts: Context[] = [] @@ -180,10 +185,7 @@ describe('flushAfterClose', () => { ) const { plugin: segmentPlugin, publisher } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 20, - flushInterval: 1000, - writeKey: '', }) publisher.flushAfterClose(3) @@ -197,10 +199,7 @@ describe('flushAfterClose', () => { it('continues to flush on each event if batch size is 1', async () => { const { plugin: segmentPlugin, publisher } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', }) publisher.flushAfterClose(3) @@ -213,10 +212,7 @@ describe('flushAfterClose', () => { it('sends immediately once there are no pending items, even if pending events exceeds batch size', async () => { const { plugin: segmentPlugin, publisher } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 3, - flushInterval: 1000, - writeKey: '', }) publisher.flushAfterClose(5) @@ -228,10 +224,7 @@ describe('flushAfterClose', () => { it('works if there are previous items in the batch', async () => { const { plugin: segmentPlugin, publisher } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 7, - flushInterval: 1000, - writeKey: '', }) range(3).forEach(() => segmentPlugin.track(_createTrackCtx())) // should not flush @@ -244,10 +237,7 @@ describe('flushAfterClose', () => { it('works if there are previous items in the batch AND pending items > batch size', async () => { const { plugin: segmentPlugin, publisher } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 7, - flushInterval: 1000, - writeKey: '', }) range(3).forEach(() => segmentPlugin.track(_createTrackCtx())) // should not flush @@ -266,10 +256,7 @@ describe('flushAfterClose', () => { describe('error handling', () => { it('excludes events that are too large', async () => { const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', }) const context = new Context( @@ -302,10 +289,7 @@ describe('error handling', () => { ) const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', }) const context = new Context(eventFactory.alias('to', 'from')) @@ -335,8 +319,6 @@ describe('error handling', () => { const { plugin: segmentPlugin } = createTestNodePlugin({ maxRetries: 2, maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', }) const context = new Context(eventFactory.alias('to', 'from')) @@ -365,8 +347,6 @@ describe('error handling', () => { const { plugin: segmentPlugin } = createTestNodePlugin({ maxRetries: 2, maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', }) const context = new Context(eventFactory.alias('my', 'from')) @@ -394,8 +374,6 @@ describe('error handling', () => { const { plugin: segmentPlugin } = createTestNodePlugin({ maxRetries: 0, maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', }) const fn = jest.fn() @@ -417,10 +395,7 @@ describe('error handling', () => { describe('http_request emitter event', () => { it('should emit an http_request object', async () => { const { plugin: segmentPlugin } = createTestNodePlugin({ - maxRetries: 3, maxEventsInBatch: 1, - flushInterval: 1000, - writeKey: '', }) fetcher.mockReturnValueOnce(createSuccess()) diff --git a/packages/node/src/plugins/segmentio/publisher.ts b/packages/node/src/plugins/segmentio/publisher.ts index 72f359ce9..f6701bb3a 100644 --- a/packages/node/src/plugins/segmentio/publisher.ts +++ b/packages/node/src/plugins/segmentio/publisher.ts @@ -3,10 +3,10 @@ import { abortSignalAfterTimeout } from '../../lib/abort' import type { Context } from '../../app/context' import { tryCreateFormattedUrl } from '../../lib/create-url' import { extractPromiseParts } from '../../lib/extract-promise-parts' -import { fetch } from '../../lib/fetch' import { ContextBatch } from './context-batch' import { NodeEmitter } from '../../app/emitter' import { b64encode } from '../../lib/base-64-encode' +import { AnalyticsHTTPClient } from '../../lib/http-client' function sleep(timeoutInMs: number): Promise { return new Promise((resolve) => setTimeout(resolve, timeoutInMs)) @@ -28,6 +28,7 @@ export interface PublisherProps { writeKey: string httpRequestTimeout?: number disable?: boolean + httpClient: AnalyticsHTTPClient } /** @@ -46,6 +47,7 @@ export class Publisher { private _httpRequestTimeout: number private _emitter: NodeEmitter private _disable: boolean + public customclient: AnalyticsHTTPClient constructor( { host, @@ -55,6 +57,7 @@ export class Publisher { flushInterval, writeKey, httpRequestTimeout, + httpClient: client, disable, }: PublisherProps, emitter: NodeEmitter @@ -70,6 +73,7 @@ export class Publisher { ) this._httpRequestTimeout = httpRequestTimeout ?? 10000 this._disable = Boolean(disable) + this.customclient = client } private createBatch(): ContextBatch { @@ -218,7 +222,7 @@ export class Publisher { return batch.resolveEvents() } - const response = await fetch(this._url, requestInit) + const response = await this.customclient.send(this._url, requestInit) clearTimeout(timeoutId)