Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IND-526] Block subscribing to subaccounts from restricted regions for read-only mode. #896

Merged
merged 3 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions indexer/packages/compliance/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
export const INDEXER_GEOBLOCKED_PAYLOAD = 'Because you appear to be a resident of, or trading from, a jurisdiction that violates our terms of use, or have engaged in activity that violates our terms of use, you have been blocked. You may withdraw your funds from the protocol at any time.';
export const INDEXER_COMPLIANCE_BLOCKED_PAYLOAD = 'Because this address appears to be a resident of, or trading from, a jurisdiction that violates our terms of use, or has engaged in activity that violates our terms of use, this address has been blocked.';
export const INDEXER_GEOBLOCKED_PAYLOAD: string = 'Because you appear to be a resident of, or trading from, a jurisdiction that violates our terms of use, or have engaged in activity that violates our terms of use, you have been blocked. You may withdraw your funds from the protocol at any time.';
export const INDEXER_COMPLIANCE_BLOCKED_PAYLOAD: string = 'Because this address appears to be a resident of, or trading from, a jurisdiction that violates our terms of use, or has engaged in activity that violates our terms of use, this address has been blocked.';

export const COUNTRY_HEADER_KEY: string = 'cf-ipcountry';
52 changes: 52 additions & 0 deletions indexer/services/socks/__tests__/lib/subscriptions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import { btcTicker, invalidChannel, invalidTicker } from '../constants';
import { axiosRequest } from '../../src/lib/axios';
import { AxiosSafeServerError, makeAxiosSafeServerError } from '@dydxprotocol-indexer/base';
import { BlockedError } from '../../src/lib/errors';
import { isRestrictedCountry } from '@dydxprotocol-indexer/compliance';

jest.mock('ws');
jest.mock('../../src/helpers/wss');
jest.mock('../../src/lib/axios');
jest.mock('@dydxprotocol-indexer/compliance');

describe('Subscriptions', () => {
let subscriptions: Subscriptions;
Expand Down Expand Up @@ -56,6 +58,8 @@ describe('Subscriptions', () => {
[Channel.V4_TRADES]: ['/v4/trades/perpetualMarket/.+'],
};
const initialMessage: Object = { a: 'b' };
const restrictedCountry: string = 'US';
const nonRestrictedCountry: string = 'AR';

beforeAll(async () => {
await dbHelpers.migrate();
Expand All @@ -79,6 +83,9 @@ describe('Subscriptions', () => {
axiosRequestMock = (axiosRequest as jest.Mock);
axiosRequestMock.mockClear();
axiosRequestMock.mockImplementation(() => (JSON.stringify(initialMessage)));
(isRestrictedCountry as jest.Mock).mockImplementation((country: string): boolean => {
return country === restrictedCountry;
});
});

describe('subscribe', () => {
Expand All @@ -98,6 +105,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
id,
false,
nonRestrictedCountry,
);

expect(sendMessageStringMock).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -140,6 +149,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
id,
false,
nonRestrictedCountry,
);

expect(sendMessageMock).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -167,6 +178,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
defaultId,
false,
nonRestrictedCountry,
);
},
).rejects.toEqual(new Error(`Invalid channel: ${invalidChannel}`));
Expand All @@ -180,6 +193,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
mockSubaccountId,
false,
nonRestrictedCountry,
);

expect(sendMessageMock).toHaveBeenCalledTimes(1);
Expand All @@ -201,6 +216,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
mockSubaccountId,
false,
nonRestrictedCountry,
);

expect(sendMessageMock).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -235,6 +252,33 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
mockSubaccountId,
false,
nonRestrictedCountry,
);

expect(sendMessageMock).toHaveBeenCalledTimes(1);
expect(sendMessageMock).toHaveBeenCalledWith(
mockWs,
connectionId,
expect.objectContaining({
connection_id: connectionId,
type: 'error',
message: expectedError.message,
}));
expect(subscriptions.subscriptions[Channel.V4_ACCOUNTS]).toBeUndefined();
expect(subscriptions.subscriptionLists[connectionId]).toBeUndefined();
});

it('sends blocked error if subscribing to subaccount from restricted country', async () => {
const expectedError: BlockedError = new BlockedError();
await subscriptions.subscribe(
mockWs,
Channel.V4_ACCOUNTS,
connectionId,
initialMsgId,
mockSubaccountId,
false,
restrictedCountry,
);

expect(sendMessageMock).toHaveBeenCalledTimes(1);
Expand All @@ -260,6 +304,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
mockSubaccountId,
false,
nonRestrictedCountry,
);

expect(sendMessageStringMock).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -295,6 +341,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
id,
false,
nonRestrictedCountry,
);
subscriptions.unsubscribe(
connectionId,
Expand All @@ -313,6 +361,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
mockSubaccountId,
false,
nonRestrictedCountry,
);
subscriptions.unsubscribe(
connectionId,
Expand All @@ -335,6 +385,8 @@ describe('Subscriptions', () => {
connectionId,
initialMsgId,
validIds[channel],
false,
nonRestrictedCountry,
);
}));

Expand Down
8 changes: 6 additions & 2 deletions indexer/services/socks/__tests__/websocket/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
import { InvalidMessageHandler } from '../../src/lib/invalid-message';
import { PingHandler } from '../../src/lib/ping';
import config from '../../src/config';
import { isRestrictedCountryHeaders } from '@dydxprotocol-indexer/compliance';
import { isRestrictedCountryHeaders, COUNTRY_HEADER_KEY } from '@dydxprotocol-indexer/compliance';

jest.mock('uuid');
jest.mock('../../src/helpers/wss');
Expand All @@ -38,6 +38,7 @@ describe('Index', () => {

const connectionId: string = 'conId';
const defaultGeoblockingEnabled: boolean = config.INDEXER_LEVEL_GEOBLOCKING_ENABLED;
const countryCode: string = 'AR';

beforeAll(() => {
jest.useFakeTimers();
Expand Down Expand Up @@ -142,7 +143,9 @@ describe('Index', () => {
beforeEach(() => {
// Connect to the index before starting each test.
(v4 as unknown as jest.Mock).mockReturnValueOnce(connectionId);
mockConnect(websocket, new IncomingMessage(new Socket()));
const incomingMessage: IncomingMessage = new IncomingMessage(new Socket());
incomingMessage.headers[COUNTRY_HEADER_KEY] = countryCode;
mockConnect(websocket, incomingMessage);
});

describe('message', () => {
Expand Down Expand Up @@ -257,6 +260,7 @@ describe('Index', () => {
index.connections[connectionId].messageId,
id,
isBatched,
countryCode,
);
});

Expand Down
24 changes: 20 additions & 4 deletions indexer/services/socks/src/lib/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
logger,
stats,
} from '@dydxprotocol-indexer/base';
import { isRestrictedCountry } from '@dydxprotocol-indexer/compliance';
import { CandleResolution, perpetualMarketRefresher } from '@dydxprotocol-indexer/postgres';
import WebSocket from 'ws';

Expand Down Expand Up @@ -77,6 +78,7 @@ export class Subscriptions {
messageId: number,
id?: string,
batched?: boolean,
country?: string,
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved
): Promise<void> {
if (this.forwardMessage === undefined) {
throw new Error('Unexpected error, subscription object is uninitialized.');
Expand Down Expand Up @@ -129,7 +131,7 @@ export class Subscriptions {
let initialResponse: string;
const startGetInitialResponse: number = Date.now();
try {
initialResponse = await this.getInitialResponsesForChannels(channel, id);
initialResponse = await this.getInitialResponsesForChannels(channel, id, country);
} catch (error) {
logger.info({
at: 'Subscription#subscribe',
Expand Down Expand Up @@ -481,11 +483,21 @@ export class Subscriptions {
}
}

private async getInitialResponseForSubaccountSubscription(id?: string): Promise<string> {
private async getInitialResponseForSubaccountSubscription(
id?: string,
country?: string,
): Promise<string> {
if (id === undefined) {
throw new Error('Invalid undefined id');
}

// TODO(IND-508): Change this to match technical spec for persistent geo-blocking. This may
// either have to replicate any blocking logic added on comlink, or re-direct to comlink to
// determine if subscribing to a specific subaccount is blocked.
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved
if (country !== undefined && isRestrictedCountry(country)) {
throw new BlockedError();
}

try {
const {
address,
Expand Down Expand Up @@ -567,9 +579,13 @@ export class Subscriptions {
* @param id Id fo the subscription to get the initial response for.
* @returns The initial response for the channel.
*/
private async getInitialResponsesForChannels(channel: Channel, id?: string): Promise<string> {
private async getInitialResponsesForChannels(
channel: Channel,
id?: string,
country?: string,
): Promise<string> {
if (channel === Channel.V4_ACCOUNTS) {
return this.getInitialResponseForSubaccountSubscription(id);
return this.getInitialResponseForSubaccountSubscription(id, country);
}
const endpoint: string | undefined = this.getInitialEndpointForSubscription(channel, id);
// If no endpoint exists, return an empty initial response.
Expand Down
1 change: 1 addition & 0 deletions indexer/services/socks/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export interface Connection {
messageId: number;
heartbeat?: NodeJS.Timeout;
disconnect?: NodeJS.Timeout;
countryCode?: string;
}

export interface MessageToForward {
Expand Down
2 changes: 2 additions & 0 deletions indexer/services/socks/src/websocket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export class Index {
this.connections[connectionId] = {
ws,
messageId: 0,
countryCode: this.countryRestrictor.getCountry(req),
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved
};

const numConcurrentConnections: number = Object.keys(this.connections).length;
Expand Down Expand Up @@ -287,6 +288,7 @@ export class Index {
this.connections[connectionId].messageId,
subscribeMessage.id,
subscribeMessage.batched,
this.connections[connectionId].countryCode,
).catch((error: Error) => logger.error({
at: 'Subscription#subscribe',
message: `Subscribing threw error: ${error.message}`,
Expand Down
5 changes: 5 additions & 0 deletions indexer/services/socks/src/websocket/restrict-countries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ export class CountryRestrictor {

return false;
}

public getCountry(req: IncomingMessage): string | undefined {
const countryHeaders: CountryHeaders = req.headers as CountryHeaders;
return countryHeaders['cf-ipcountry'];
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading