From 11d2cb6d556af5bee6fa7222759f18efc96e5b66 Mon Sep 17 00:00:00 2001 From: Innovative-GauravKochar <117165746+Innovative-GauravKochar@users.noreply.github.com> Date: Wed, 28 Aug 2024 13:52:25 +0530 Subject: [PATCH] Refactored re-authentication flow for all execute blocks (#2294) * Improved reauthentication flow for all execute blocks and created unit test cases * Created a seperate PR for createAudience and getAudience Handler , removing it from here * Moved * removed extra spaces * Simplified code * nit: using consistent language across tests - should is generally used in this codebase * nit: using consistent language across tests - should is generally used in this codebase * remove unnecesary spacing changes * remove unnecesary spacing changes * remove unnecesary spacing changes * remove unnecesary spacing changes --------- Co-authored-by: Gaurav Kochar --- .../src/__tests__/destination-kit.test.ts | 479 +++++++++++++++++- packages/core/src/destination-kit/index.ts | 79 +-- 2 files changed, 476 insertions(+), 82 deletions(-) diff --git a/packages/core/src/__tests__/destination-kit.test.ts b/packages/core/src/__tests__/destination-kit.test.ts index 5e02033a2a..272a6278c7 100644 --- a/packages/core/src/__tests__/destination-kit.test.ts +++ b/packages/core/src/__tests__/destination-kit.test.ts @@ -8,11 +8,13 @@ import { StatsClient, StatsContext, TransactionContext, - AudienceDestinationDefinition, - AuthenticationScheme + AuthenticationScheme, + RefreshAccessTokenResult, + AudienceDestinationDefinition } from '../destination-kit' import { JSONObject } from '../json-object' import { SegmentEvent } from '../segment-event' +const WRONG_ADVERTISER_ID = '12861247612' const WRONG_AUDIENCE_ID = '1234567890' const destinationCustomAuth: DestinationDefinition = { @@ -81,16 +83,91 @@ const destinationOAuth2: DestinationDefinition = { } } } - const authentication: AuthenticationScheme = { scheme: 'oauth2', - fields: {}, + fields: { + apiSecret: { + label: 'API secret', + description: 'Api key', + type: 'string', + required: true + } + }, refreshAccessToken: (_request) => { return new Promise((resolve, _reject) => { - resolve({ - accessToken: 'fresh-token' + resolve({ accessToken: 'fresh-token' }) + }) + } +} + +const destinationOAuth3: DestinationDefinition = { + name: 'Actions Google Analytic 5', + mode: 'cloud', + authentication: authentication, + onDelete: async (_request, { auth, payload }) => { + if (auth?.accessToken == 'invalid-access-token') { + return new Promise((_resolve, reject) => { + reject(new IntegrationError('Unauthorized', 'UNAUTHORIZED', 401)) }) + } + + // it could be due to invalid input or Bad Request + if (!payload?.userId) { + return new Promise((_resolve, reject) => { + reject(new IntegrationError('Wrong AdvertiserId Value', 'BAD REQUEST', 400)) + }) + } + return new Promise((resolve, _reject) => { + resolve({ output: 'Deleted' }) }) + }, + actions: { + customEvent: { + title: 'Send a Custom Event', + description: 'Send events to a custom event in API', + defaultSubscription: 'type = "track"', + fields: { + advertiserId: { + label: 'Advertiser ID', + description: 'Advertiser Id', + type: 'string', + required: true + } + }, + perform: (_request: any, { auth, payload }) => { + if (auth?.accessToken == 'invalid-access-token') { + return new Promise((_resolve, reject) => { + reject(new IntegrationError('Unauthorized', 'UNAUTHORIZED', 401)) + }) + } + + // it could be due to invalid input or Bad Request + if (!payload?.advertiserId) + throw new IntegrationError('Missing advertiserId Value', 'MISSING_REQUIRED_FIELD', 400) + + return new Promise((resolve, _reject) => { + resolve('this is a test') + }) + }, + performBatch: (_request, { auth, payload }) => { + if (auth?.accessToken == 'invalid-access-token') { + return new Promise((_resolve, reject) => { + reject(new IntegrationError('Unauthorized', 'UNAUTHORIZED', 401)) + }) + } + + // it could be due to invalid input in Batch API Response, Entire Batch Failed ! + if (payload[0]?.advertiserId == WRONG_ADVERTISER_ID) { + return new Promise((_resolve, reject) => { + reject(new IntegrationError('Wrong AdvertiserId Value', 'BAD REQUEST', 400)) + }) + } + + return new Promise((resolve, _reject) => { + resolve('this is a test') + }) + } + } } } @@ -450,7 +527,6 @@ describe('destination kit', () => { } } } - const res = await destinationTest.onEvent(testEvent, testSettings) expect(res).toEqual([ { output: 'Mappings resolved' }, @@ -568,7 +644,6 @@ describe('destination kit', () => { } ]) }) - test('should inject the matchingKey value in the perform handler', async () => { const destinationTest = new Destination(destinationWithIdentifier) const testEvent: SegmentEvent = { type: 'track' } @@ -583,9 +658,7 @@ describe('destination kit', () => { } } } - const res = await destinationTest.onEvent(testEvent, testSettings) - expect(res).toEqual([ { output: 'Mappings resolved' }, { output: 'Payload validated' }, @@ -595,7 +668,6 @@ describe('destination kit', () => { } ]) }) - test('should inject the matchingKey value in the performBatch handler', async () => { const destinationTest = new Destination(destinationWithIdentifier) const testEvent: SegmentEvent = { type: 'track' } @@ -838,6 +910,7 @@ describe('destination kit', () => { ]) }) }) + describe('transactionContext', () => { test('should not crash when transactionContext is passed to the perform handler', async () => { const destinationTest = new Destination(destinationWithOptions) @@ -1032,11 +1105,382 @@ describe('destination kit', () => { }) }) }) + describe('Reauthentication Flow', () => { beforeEach(async () => { jest.restoreAllMocks() jest.resetAllMocks() }) + describe('onDelete', () => { + test('should refresh the access-token in case of Unauthorized(401) and update it in Cache', async () => { + const destinationTest = new Destination(destinationOAuth3) + const testEvent: SegmentEvent = { + traits: { a: 'foo' }, + userId: '3456fff', + type: 'identify' + } + const testSettings = { + apiSecret: 'test_key', + subscription: { + subscribe: 'type = "identify"', + partnerAction: 'customEvent', + mapping: { + name: 'fancy_event123', + advertiserId: '1231241241' + } + }, + oauth: { + access_token: 'invalid-access-token', + refresh_token: 'refresh-token' + } + } + const eventOptions = { + onTokenRefresh: async (_tokens: RefreshAccessTokenResult) => { + jest.fn(() => Promise.resolve()) + }, + synchronizeRefreshAccessToken: async () => { + jest.fn(() => Promise.resolve()) + } + } + const refreshTokenSpy = jest.spyOn(authentication, 'refreshAccessToken') + const UpdateTokenSpy = jest.spyOn(eventOptions, 'onTokenRefresh') + const synchronizeRefreshAccessTokenSpy = jest.spyOn(eventOptions, 'synchronizeRefreshAccessToken') + const res = await destinationTest.onDelete?.(testEvent, testSettings, eventOptions) + expect(res).toEqual({ output: 'Deleted' }) + expect(refreshTokenSpy).toHaveBeenCalledTimes(1) + expect(UpdateTokenSpy).toHaveBeenCalledTimes(1) + expect(synchronizeRefreshAccessTokenSpy).toHaveBeenCalledTimes(1) + }) + + test('should not refresh access-token in case of any non 401 error', async () => { + const destinationTest = new Destination(destinationOAuth3) + const testEvent: SegmentEvent = { + properties: { a: 'foo', field_one: 'test input' }, + traits: { + b: 'foo' + }, + type: 'identify' + } + const testSettings = { + apiSecret: 'test_key', + subscription: { + subscribe: 'type = "identify" and properties.a = "foo"', + partnerAction: 'customEvent', + mapping: { + clientId: '23455343467', + name: 'fancy_event' + } + }, + oauth: { + access_token: 'valid-access-token', + refresh_token: 'refresh-token' + } + } + const eventOptions = { + onTokenRefresh: async (_tokens: RefreshAccessTokenResult) => { + jest.fn(() => Promise.resolve()) + }, + synchronizeRefreshAccessToken: async () => { + jest.fn(() => Promise.resolve()) + } + } + const spy = jest.spyOn(authentication, 'refreshAccessToken') + const UpdateTokenSpy = jest.spyOn(eventOptions, 'onTokenRefresh') + const synchronizeRefreshAccessTokenSpy = jest.spyOn(eventOptions, 'synchronizeRefreshAccessToken') + await expect(destinationTest.onDelete?.(testEvent, testSettings)).rejects.toThrowError() + expect(spy).toHaveBeenCalledTimes(0) + expect(UpdateTokenSpy).toHaveBeenCalledTimes(0) + expect(synchronizeRefreshAccessTokenSpy).toHaveBeenCalledTimes(0) + }) + test('should not refresh access-token if token is already valid', async () => { + const destinationTest = new Destination(destinationOAuth3) + const testEvent: SegmentEvent = { + properties: { a: 'foo', field_one: 'test input' }, + traits: { + b: 'foo' + }, + userId: '3456fff', + type: 'identify' + } + const testSettings = { + apiSecret: 'test_key', + subscription: { + subscribe: 'type = "identify" and properties.a = "foo"', + partnerAction: 'customEvent', + mapping: { + name: 'fancy_event', + advertiserId: '1231241241' + } + }, + oauth: { + access_token: 'valid-access-token', + refresh_token: 'refresh-token' + } + } + const spy = jest.spyOn(authentication, 'refreshAccessToken') + const res = await destinationTest.onDelete?.(testEvent, testSettings) + expect(res).toEqual({ output: 'Deleted' }) + expect(spy).toHaveBeenCalledTimes(0) + }) + }) + describe('onEvent', () => { + test('should refresh the access-token in case of Unauthorized(401) and update it in Cache', async () => { + const destinationTest = new Destination(destinationOAuth3) + const testEvent: SegmentEvent = { + traits: { a: 'foo' }, + userId: '3456fff', + type: 'identify' + } + const testSettings = { + apiSecret: 'test_key', + subscription: { + subscribe: 'type = "identify"', + partnerAction: 'customEvent', + mapping: { + name: 'fancy_event123', + advertiserId: '1231241241' + } + }, + oauth: { + access_token: 'invalid-access-token', + refresh_token: 'refresh-token' + } + } + + const eventOptions = { + onTokenRefresh: async (_tokens: RefreshAccessTokenResult) => { + jest.fn(() => Promise.resolve()) + }, + synchronizeRefreshAccessToken: async () => { + jest.fn(() => Promise.resolve()) + } + } + + const refreshTokenSpy = jest.spyOn(authentication, 'refreshAccessToken') + const UpdateTokenSpy = jest.spyOn(eventOptions, 'onTokenRefresh') + const synchronizeRefreshAccessTokenSpy = jest.spyOn(eventOptions, 'synchronizeRefreshAccessToken') + + const res = await destinationTest.onEvent(testEvent, testSettings, eventOptions) + expect(res).toEqual([ + { output: 'Mappings resolved' }, + { output: 'Payload validated' }, + { data: 'this is a test', output: 'Action Executed' } + ]) + expect(refreshTokenSpy).toHaveBeenCalledTimes(1) + expect(UpdateTokenSpy).toHaveBeenCalledTimes(1) + expect(synchronizeRefreshAccessTokenSpy).toHaveBeenCalledTimes(1) + }) + test('should not refresh access-token in case of any non 401 error', async () => { + const destinationTest = new Destination(destinationOAuth3) + const testEvent: SegmentEvent = { + properties: { a: 'foo', field_one: 'test input' }, + traits: { + b: 'foo' + }, + userId: '3456fff', + type: 'identify' + } + const testSettings = { + apiSecret: 'test_key', + subscription: { + subscribe: 'type = "identify" and properties.a = "foo"', + partnerAction: 'customEvent', + mapping: { + clientId: '23455343467', + name: 'fancy_event' + } + }, + oauth: { + access_token: 'valid-access-token', + refresh_token: 'refresh-token' + } + } + const eventOptions = { + onTokenRefresh: async (_tokens: RefreshAccessTokenResult) => { + jest.fn(() => Promise.resolve()) + }, + synchronizeRefreshAccessToken: async () => { + jest.fn(() => Promise.resolve()) + } + } + + const spy = jest.spyOn(authentication, 'refreshAccessToken') + const UpdateTokenSpy = jest.spyOn(eventOptions, 'onTokenRefresh') + const synchronizeRefreshAccessTokenSpy = jest.spyOn(eventOptions, 'synchronizeRefreshAccessToken') + await expect(destinationTest.onEvent(testEvent, testSettings)).rejects.toThrowError() + expect(spy).toHaveBeenCalledTimes(0) + expect(UpdateTokenSpy).toHaveBeenCalledTimes(0) + expect(synchronizeRefreshAccessTokenSpy).toHaveBeenCalledTimes(0) + }) + + test('should not refresh access-token if token is already valid', async () => { + const destinationTest = new Destination(destinationOAuth3) + const testEvent: SegmentEvent = { + properties: { a: 'foo', field_one: 'test input' }, + traits: { + b: 'foo' + }, + userId: '3456fff', + type: 'identify' + } + const testSettings = { + apiSecret: 'test_key', + subscription: { + subscribe: 'type = "identify" and properties.a = "foo"', + partnerAction: 'customEvent', + mapping: { + name: 'fancy_event', + advertiserId: '1231241241' + } + }, + oauth: { + access_token: 'valid-access-token', + refresh_token: 'refresh-token' + } + } + const spy = jest.spyOn(authentication, 'refreshAccessToken') + const res = await destinationTest.onEvent(testEvent, testSettings) + expect(res).toEqual([ + { output: 'Mappings resolved' }, + { output: 'Payload validated' }, + { data: 'this is a test', output: 'Action Executed' } + ]) + expect(spy).toHaveBeenCalledTimes(0) + }) + }) + describe('onBatch', () => { + test('should refresh the access-token in case of Unauthorized(401)', async () => { + const destinationTest = new Destination(destinationOAuth3) + const testEvents: SegmentEvent[] = [ + { + properties: { a: 'foo', advertiserId: 123456789 }, + userId: '3456fff', + type: 'track' + }, + { + properties: { a: 'foo', advertiserId: 987654321 }, + userId: '3456fff', + type: 'track' + } + ] + const testSettings = { + apiSecret: 'test_key', + subscription: { + subscribe: 'type = "track"', + partnerAction: 'customEvent', + mapping: { + name: 'fancy_event123', + advertiserId: { '@path': '$.properties.advertiserId' } + } + }, + oauth: { + access_token: 'invalid-access-token', + refresh_token: 'refresh-token' + } + } + const eventOptions = { + onTokenRefresh: async (_tokens: RefreshAccessTokenResult) => { + jest.fn(() => Promise.resolve()) + }, + synchronizeRefreshAccessToken: async () => { + jest.fn(() => Promise.resolve()) + } + } + + const refreshTokenSpy = jest.spyOn(authentication, 'refreshAccessToken') + const UpdateTokenSpy = jest.spyOn(eventOptions, 'onTokenRefresh') + const synchronizeRefreshAccessTokenSpy = jest.spyOn(eventOptions, 'synchronizeRefreshAccessToken') + const res = await destinationTest.onBatch(testEvents, testSettings, eventOptions) + expect(res).toEqual([ + { + output: 'successfully processed batch of events' + } + ]) + expect(refreshTokenSpy).toHaveBeenCalledTimes(1) + expect(UpdateTokenSpy).toHaveBeenCalledTimes(1) + expect(synchronizeRefreshAccessTokenSpy).toHaveBeenCalledTimes(1) + }) + + test('should not refresh access-token in case of any non 401 error', async () => { + const destinationTest = new Destination(destinationOAuth3) + const testEvents: SegmentEvent[] = [ + { + properties: { a: 'foo', advertiserId: WRONG_ADVERTISER_ID }, + userId: '3456fff', + type: 'track' + } + ] + const testSettings = { + apiSecret: 'test_key', + subscription: { + subscribe: 'type = "track"', + partnerAction: 'customEvent', + mapping: { + name: 'fancy_event123', + advertiserId: { '@path': '$.properties.advertiserId' } + } + }, + oauth: { + access_token: 'valid-access-token', + refresh_token: 'refresh-token' + } + } + const eventOptions = { + onTokenRefresh: async (_tokens: RefreshAccessTokenResult) => { + jest.fn(() => Promise.resolve()) + }, + synchronizeRefreshAccessToken: async () => { + jest.fn(() => Promise.resolve()) + } + } + + const refreshTokenSpy = jest.spyOn(authentication, 'refreshAccessToken') + const UpdateTokenSpy = jest.spyOn(eventOptions, 'onTokenRefresh') + const synchronizeRefreshAccessTokenSpy = jest.spyOn(eventOptions, 'synchronizeRefreshAccessToken') + await expect(destinationTest.onBatch(testEvents, testSettings)).rejects.toThrowError() + expect(refreshTokenSpy).toHaveBeenCalledTimes(0) + expect(UpdateTokenSpy).toHaveBeenCalledTimes(0) + expect(synchronizeRefreshAccessTokenSpy).toHaveBeenCalledTimes(0) + }) + test('should not refresh access-token if token is already valid', async () => { + const destinationTest = new Destination(destinationOAuth3) + const testEvents: SegmentEvent[] = [ + { + properties: { a: 'foo', advertiserId: 123456789 }, + userId: '3456fff', + type: 'track' + }, + { + properties: { a: 'foo', advertiserId: 987654321 }, + userId: '3456fff', + type: 'track' + } + ] + const testSettings = { + apiSecret: 'test_key', + subscription: { + subscribe: 'type = "track"', + partnerAction: 'customEvent', + mapping: { + name: 'fancy_event123', + advertiserId: { '@path': '$.properties.advertiserId' } + } + }, + oauth: { + access_token: 'valid-access-token', + refresh_token: 'refresh-token' + } + } + const spy = jest.spyOn(authentication, 'refreshAccessToken') + const res = await destinationTest.onBatch(testEvents, testSettings) + expect(res).toEqual([ + { + output: 'successfully processed batch of events' + } + ]) + expect(spy).toHaveBeenCalledTimes(0) + }) + }) describe('createAudience', () => { test('Refreshes the access-token in case of Unauthorized(401)', async () => { const createAudienceInput = { @@ -1061,7 +1505,7 @@ describe('destination kit', () => { expect(spy).toHaveBeenCalledTimes(1) }) - test('Will not refresh access-token in case of any non 401 error', async () => { + test('should not refresh access-token in case of any non 401 error', async () => { const createAudienceInput = { audienceName: 'Test Audience', settings: { @@ -1081,7 +1525,7 @@ describe('destination kit', () => { expect(spy).not.toHaveBeenCalled() }) - test('Will not refresh access-token if token is already valid', async () => { + test('should not refresh access-token if token is already valid', async () => { const createAudienceInput = { audienceName: 'Test Audience', settings: { @@ -1105,7 +1549,7 @@ describe('destination kit', () => { expect(spy).not.toHaveBeenCalled() }) - test('Will not refresh the access-token for non-Oauth authentication scheme', async () => { + test('should not refresh the access-token for non-Oauth authentication scheme', async () => { const createAudienceInput = { audienceName: 'Test Audience', settings: { @@ -1129,7 +1573,6 @@ describe('destination kit', () => { expect(spy).not.toHaveBeenCalled() }) }) - describe('getAudience', () => { test('Refreshes the access-token in case of Unauthorized(401)', async () => { const getAudienceInput = { @@ -1152,7 +1595,7 @@ describe('destination kit', () => { expect(spy).toHaveBeenCalledTimes(1) }) - test('Will not refresh access-token in case of any non 401 error', async () => { + test('should not refresh access-token in case of any non 401 error', async () => { const getAudienceInput = { externalId: WRONG_AUDIENCE_ID, settings: { @@ -1171,7 +1614,7 @@ describe('destination kit', () => { expect(spy).not.toHaveBeenCalled() }) - test('Will not refresh access-token if token is already valid', async () => { + test('should not refresh access-token if token is already valid', async () => { const getAudienceInput = { externalId: '366170701270726115', settings: { @@ -1191,7 +1634,7 @@ describe('destination kit', () => { expect(spy).not.toHaveBeenCalled() }) - test('Will not refresh the access-token for non-Oauth authentication scheme', async () => { + test('should not refresh the access-token for non-Oauth authentication scheme', async () => { const getAudienceInput = { externalId: '366170701270726115', settings: { diff --git a/packages/core/src/destination-kit/index.ts b/packages/core/src/destination-kit/index.ts index f3dea83d35..3055d8e580 100644 --- a/packages/core/src/destination-kit/index.ts +++ b/packages/core/src/destination-kit/index.ts @@ -747,22 +747,21 @@ export class Destination { const payload = { userId, anonymousId } const destinationSettings = this.getDestinationSettings(settings as unknown as JSONObject) this.validateSettings(destinationSettings) - const auth = getAuthData(settings as unknown as JSONObject) - const data: ExecuteInput = { - payload, - settings: destinationSettings, - auth - } - const context: ExecuteInput = { - settings: destinationSettings, - payload, - auth - } - - const opts = this.extendRequest?.(context) ?? {} - const requestClient = createRequestClient({ ...opts, statsContext: context.statsContext }) const run = async () => { + const auth = getAuthData(settings as unknown as JSONObject) + const data: ExecuteInput = { + payload, + settings: destinationSettings, + auth + } + const context: ExecuteInput = { + settings: destinationSettings, + payload, + auth + } + const opts = this.extendRequest?.(context) ?? {} + const requestClient = createRequestClient({ ...opts, statsContext: context.statsContext }) const deleteResult = await this.definition.onDelete?.(requestClient, data) const result: Result = deleteResult ?? { output: 'no onDelete defined' } @@ -770,31 +769,7 @@ export class Destination { } const onFailedAttempt = async (error: ResponseError & HTTPError) => { - const statusCode = error?.status ?? error?.response?.status ?? 500 - - // Throw original error if it is unrelated to invalid access tokens and not an oauth2 scheme - if ( - !( - statusCode === 401 && - (this.authentication?.scheme === 'oauth2' || this.authentication?.scheme === 'oauth-managed') - ) - ) { - throw error - } - - const oauthSettings = getOAuth2Data(settings) - const newTokens = await this.refreshAccessToken( - destinationSettings, - oauthSettings, - options?.synchronizeRefreshAccessToken - ) - if (!newTokens) { - throw new InvalidAuthenticationError('Failed to refresh access token', ErrorCodes.OAUTH_REFRESH_FAILED) - } - - // Update `settings` with new tokens - settings = updateOAuthSettings(settings, newTokens) - await options?.onTokenRefresh?.(newTokens) + settings = await this.handleAuthError(error, settings, options) } return await retry(run, { retries: 2, onFailedAttempt }) @@ -822,31 +797,7 @@ export class Destination { // eslint-disable-next-line @typescript-eslint/no-explicit-any const onFailedAttempt = async (error: any) => { - const statusCode = error?.status ?? error?.response?.status ?? 500 - - // Throw original error if it is unrelated to invalid access tokens and not an oauth2 scheme - if ( - !( - statusCode === 401 && - (this.authentication?.scheme === 'oauth2' || this.authentication?.scheme === 'oauth-managed') - ) - ) { - throw error - } - - const oauthSettings = getOAuth2Data(settings) - const newTokens = await this.refreshAccessToken( - destinationSettings, - oauthSettings, - options?.synchronizeRefreshAccessToken - ) - if (!newTokens) { - throw new InvalidAuthenticationError('Failed to refresh access token', ErrorCodes.OAUTH_REFRESH_FAILED) - } - - // Update `settings` with new tokens - settings = updateOAuthSettings(settings, newTokens) - await options?.onTokenRefresh?.(newTokens) + settings = await this.handleAuthError(error, settings, options) } return await retry(run, { retries: 2, onFailedAttempt })