Skip to content

Commit

Permalink
[IND-526] Block subscribing to subaccounts from restricted regions fo…
Browse files Browse the repository at this point in the history
…r read-only mode. (#896)
  • Loading branch information
vincentwschau authored Dec 19, 2023
1 parent a795a81 commit 4a0aad8
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 8 deletions.
8 changes: 6 additions & 2 deletions indexer/packages/compliance/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
export const INDEXER_GEOBLOCKED_PAYLOAD = 'Because you appear to be a resident of, or trading from, a jurisdiction that violates our terms of use, or have engaged in activity that violates our terms of use, you have been blocked. You may withdraw your funds from the protocol at any time.';
export const INDEXER_COMPLIANCE_BLOCKED_PAYLOAD = 'Because this address appears to be a resident of, or trading from, a jurisdiction that violates our terms of use, or has engaged in activity that violates our terms of use, this address has been blocked.';
export const INDEXER_GEOBLOCKED_PAYLOAD: string = 'Because you appear to be a resident of, or trading from, a jurisdiction that violates our terms of use, or have engaged in activity that violates our terms of use, you have been blocked. You may withdraw your funds from the protocol at any time.';
export const INDEXER_COMPLIANCE_BLOCKED_PAYLOAD: string = 'Because this address appears to be a resident of, or trading from, a jurisdiction that violates our terms of use, or has engaged in activity that violates our terms of use, this address has been blocked.';

// For use by other services packages, can't be used to index on the actual requests
// object as that needs to be a string literal.
export const COUNTRY_HEADER_KEY: string = 'cf-ipcountry';
52 changes: 52 additions & 0 deletions indexer/services/socks/__tests__/lib/subscriptions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import { btcTicker, invalidChannel, invalidTicker } from '../constants';
import { axiosRequest } from '../../src/lib/axios';
import { AxiosSafeServerError, makeAxiosSafeServerError } from '@dydxprotocol-indexer/base';
import { BlockedError } from '../../src/lib/errors';
import { isRestrictedCountry } from '@dydxprotocol-indexer/compliance';

jest.mock('ws');
jest.mock('../../src/helpers/wss');
jest.mock('../../src/lib/axios');
jest.mock('@dydxprotocol-indexer/compliance');

describe('Subscriptions', () => {
let subscriptions: Subscriptions;
Expand Down Expand Up @@ -56,6 +58,8 @@ describe('Subscriptions', () => {
[Channel.V4_TRADES]: ['/v4/trades/perpetualMarket/.+'],
};
const initialMessage: Object = { a: 'b' };
const restrictedCountry: string = 'US';
const nonRestrictedCountry: string = 'AR';

beforeAll(async () => {
await dbHelpers.migrate();
Expand All @@ -79,6 +83,9 @@ describe('Subscriptions', () => {
axiosRequestMock = (axiosRequest as jest.Mock);
axiosRequestMock.mockClear();
axiosRequestMock.mockImplementation(() => (JSON.stringify(initialMessage)));
(isRestrictedCountry as jest.Mock).mockImplementation((country: string): boolean => {
return country === restrictedCountry;
});
});

describe('subscribe', () => {
Expand All @@ -98,6 +105,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
id,
false,
nonRestrictedCountry,
);

expect(sendMessageStringMock).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -140,6 +149,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
id,
false,
nonRestrictedCountry,
);

expect(sendMessageMock).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -167,6 +178,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
defaultId,
false,
nonRestrictedCountry,
);
},
).rejects.toEqual(new Error(`Invalid channel: ${invalidChannel}`));
Expand All @@ -180,6 +193,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
mockSubaccountId,
false,
nonRestrictedCountry,
);

expect(sendMessageMock).toHaveBeenCalledTimes(1);
Expand All @@ -201,6 +216,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
mockSubaccountId,
false,
nonRestrictedCountry,
);

expect(sendMessageMock).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -235,6 +252,33 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
mockSubaccountId,
false,
nonRestrictedCountry,
);

expect(sendMessageMock).toHaveBeenCalledTimes(1);
expect(sendMessageMock).toHaveBeenCalledWith(
mockWs,
connectionId,
expect.objectContaining({
connection_id: connectionId,
type: 'error',
message: expectedError.message,
}));
expect(subscriptions.subscriptions[Channel.V4_ACCOUNTS]).toBeUndefined();
expect(subscriptions.subscriptionLists[connectionId]).toBeUndefined();
});

it('sends blocked error if subscribing to subaccount from restricted country', async () => {
const expectedError: BlockedError = new BlockedError();
await subscriptions.subscribe(
mockWs,
Channel.V4_ACCOUNTS,
connectionId,
initialMsgId,
mockSubaccountId,
false,
restrictedCountry,
);

expect(sendMessageMock).toHaveBeenCalledTimes(1);
Expand All @@ -260,6 +304,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
mockSubaccountId,
false,
nonRestrictedCountry,
);

expect(sendMessageStringMock).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -295,6 +341,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
id,
false,
nonRestrictedCountry,
);
subscriptions.unsubscribe(
connectionId,
Expand All @@ -313,6 +361,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
mockSubaccountId,
false,
nonRestrictedCountry,
);
subscriptions.unsubscribe(
connectionId,
Expand All @@ -335,6 +385,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
validIds[channel],
false,
nonRestrictedCountry,
);
}));

Expand Down
8 changes: 6 additions & 2 deletions indexer/services/socks/__tests__/websocket/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
import { InvalidMessageHandler } from '../../src/lib/invalid-message';
import { PingHandler } from '../../src/lib/ping';
import config from '../../src/config';
import { isRestrictedCountryHeaders } from '@dydxprotocol-indexer/compliance';
import { isRestrictedCountryHeaders, COUNTRY_HEADER_KEY } from '@dydxprotocol-indexer/compliance';

jest.mock('uuid');
jest.mock('../../src/helpers/wss');
Expand All @@ -38,6 +38,7 @@ describe('Index', () => {

const connectionId: string = 'conId';
const defaultGeoblockingEnabled: boolean = config.INDEXER_LEVEL_GEOBLOCKING_ENABLED;
const countryCode: string = 'AR';

beforeAll(() => {
jest.useFakeTimers();
Expand Down Expand Up @@ -142,7 +143,9 @@ describe('Index', () => {
beforeEach(() => {
// Connect to the index before starting each test.
(v4 as unknown as jest.Mock).mockReturnValueOnce(connectionId);
mockConnect(websocket, new IncomingMessage(new Socket()));
const incomingMessage: IncomingMessage = new IncomingMessage(new Socket());
incomingMessage.headers[COUNTRY_HEADER_KEY] = countryCode;
mockConnect(websocket, incomingMessage);
});

describe('message', () => {
Expand Down Expand Up @@ -257,6 +260,7 @@ describe('Index', () => {
index.connections[connectionId].messageId,
id,
isBatched,
countryCode,
);
});

Expand Down
24 changes: 20 additions & 4 deletions indexer/services/socks/src/lib/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
logger,
stats,
} from '@dydxprotocol-indexer/base';
import { isRestrictedCountry } from '@dydxprotocol-indexer/compliance';
import { CandleResolution, perpetualMarketRefresher } from '@dydxprotocol-indexer/postgres';
import WebSocket from 'ws';

Expand Down Expand Up @@ -77,6 +78,7 @@ export class Subscriptions {
messageId: number,
id?: string,
batched?: boolean,
country?: string,
): Promise<void> {
if (this.forwardMessage === undefined) {
throw new Error('Unexpected error, subscription object is uninitialized.');
Expand Down Expand Up @@ -129,7 +131,7 @@ export class Subscriptions {
let initialResponse: string;
const startGetInitialResponse: number = Date.now();
try {
initialResponse = await this.getInitialResponsesForChannels(channel, id);
initialResponse = await this.getInitialResponsesForChannels(channel, id, country);
} catch (error) {
logger.info({
at: 'Subscription#subscribe',
Expand Down Expand Up @@ -481,11 +483,21 @@ export class Subscriptions {
}
}

private async getInitialResponseForSubaccountSubscription(id?: string): Promise<string> {
private async getInitialResponseForSubaccountSubscription(
id?: string,
country?: string,
): Promise<string> {
if (id === undefined) {
throw new Error('Invalid undefined id');
}

// TODO(IND-508): Change this to match technical spec for persistent geo-blocking. This may
// either have to replicate any blocking logic added on comlink, or re-direct to comlink to
// determine if subscribing to a specific subaccount is blocked.
if (country !== undefined && isRestrictedCountry(country)) {
throw new BlockedError();
}

try {
const {
address,
Expand Down Expand Up @@ -567,9 +579,13 @@ export class Subscriptions {
* @param id Id fo the subscription to get the initial response for.
* @returns The initial response for the channel.
*/
private async getInitialResponsesForChannels(channel: Channel, id?: string): Promise<string> {
private async getInitialResponsesForChannels(
channel: Channel,
id?: string,
country?: string,
): Promise<string> {
if (channel === Channel.V4_ACCOUNTS) {
return this.getInitialResponseForSubaccountSubscription(id);
return this.getInitialResponseForSubaccountSubscription(id, country);
}
const endpoint: string | undefined = this.getInitialEndpointForSubscription(channel, id);
// If no endpoint exists, return an empty initial response.
Expand Down
1 change: 1 addition & 0 deletions indexer/services/socks/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export interface Connection {
messageId: number;
heartbeat?: NodeJS.Timeout;
disconnect?: NodeJS.Timeout;
countryCode?: string;
}

export interface MessageToForward {
Expand Down
2 changes: 2 additions & 0 deletions indexer/services/socks/src/websocket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export class Index {
this.connections[connectionId] = {
ws,
messageId: 0,
countryCode: this.countryRestrictor.getCountry(req),
};

const numConcurrentConnections: number = Object.keys(this.connections).length;
Expand Down Expand Up @@ -287,6 +288,7 @@ export class Index {
this.connections[connectionId].messageId,
subscribeMessage.id,
subscribeMessage.batched,
this.connections[connectionId].countryCode,
).catch((error: Error) => logger.error({
at: 'Subscription#subscribe',
message: `Subscribing threw error: ${error.message}`,
Expand Down
5 changes: 5 additions & 0 deletions indexer/services/socks/src/websocket/restrict-countries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ export class CountryRestrictor {

return false;
}

public getCountry(req: IncomingMessage): string | undefined {
const countryHeaders: CountryHeaders = req.headers as CountryHeaders;
return countryHeaders['cf-ipcountry'];
}
}

0 comments on commit 4a0aad8

Please sign in to comment.