Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IND-552] add roundtable task to take fast sync Postgres snapshots every 4 hours #912

Merged
merged 15 commits into from
Jan 5, 2024
1 change: 1 addition & 0 deletions indexer/packages/base/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ export const ONE_MINUTE_IN_MILLISECONDS: number = 60 * ONE_SECOND_IN_MILLISECOND
export const FIVE_MINUTES_IN_MILLISECONDS: number = 5 * ONE_MINUTE_IN_MILLISECONDS;
export const TEN_MINUTES_IN_MILLISECONDS: number = 10 * ONE_MINUTE_IN_MILLISECONDS;
export const ONE_HOUR_IN_MILLISECONDS: number = 60 * ONE_MINUTE_IN_MILLISECONDS;
export const FOUR_HOURS_IN_MILLISECONDS: number = 4 * ONE_HOUR_IN_MILLISECONDS;
3 changes: 2 additions & 1 deletion indexer/services/roundtable/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ DB_PASSWORD=dydxserver123
DB_PORT=5436
AWS_ACCOUNT_ID=invalid-test-account-id
AWS_REGION=invalid-test-region
S3_BUCKET_ARN=invalid-test-bucket
RESEARCH_SNAPSHOT_S3_BUCKET_ARN=invalid-research-test-bucket
FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN=invalid-snapshot-test-bucket
ECS_TASK_ROLE_ARN=invalid-test-arn
KMS_KEY_ARN=invalid-kms-key-arn
RDS_INSTANCE_NAME=invalid-rds-instance-name
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import config from '../../src/config';
import { asMock } from '@dydxprotocol-indexer/dev';
import { createDBSnapshot, getMostRecentDBSnapshotIdentifier, startExportTask } from '../../src/helpers/aws';
import takeFastSyncSnapshotTask from '../../src/tasks/take-fast-sync-snapshot';
import { DateTime } from 'luxon';

jest.mock('../../src/helpers/aws');

describe('fast-sync-export-db-snapshot', () => {
const snapshotIdentifier: string = `${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-postgres-main-staging-2022-05-03-04-16`;
beforeAll(() => {
config.RDS_INSTANCE_NAME = 'postgres-main-staging';
});

beforeEach(() => {
jest.resetAllMocks();
asMock(getMostRecentDBSnapshotIdentifier).mockImplementation(
async () => Promise.resolve(snapshotIdentifier),
);
});

afterAll(jest.resetAllMocks);

it('Last snapshot was taken more than interval ago', async () => {
await takeFastSyncSnapshotTask();

expect(createDBSnapshot).toHaveBeenCalled();
expect(startExportTask).toHaveBeenCalled();
});

it('Last snapshot was taken less than interval ago', async () => {
const timestamp: string = DateTime.utc().minus({ minutes: 1 }).toFormat('yyyy-MM-dd-HH-mm');
asMock(getMostRecentDBSnapshotIdentifier).mockImplementation(
async () => Promise.resolve(`${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-postgres-main-staging-${timestamp}`),
);

await takeFastSyncSnapshotTask();

expect(createDBSnapshot).not.toHaveBeenCalled();
expect(startExportTask).not.toHaveBeenCalled();
});

it('No existing snapshot', async () => {
asMock(getMostRecentDBSnapshotIdentifier).mockImplementation(
async () => Promise.resolve(undefined),
);

await takeFastSyncSnapshotTask();

expect(createDBSnapshot).toHaveBeenCalled();
expect(startExportTask).toHaveBeenCalled();
});
});
10 changes: 9 additions & 1 deletion indexer/services/roundtable/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
ONE_HOUR_IN_MILLISECONDS,
ONE_SECOND_IN_MILLISECONDS,
TEN_SECONDS_IN_MILLISECONDS,
FOUR_HOURS_IN_MILLISECONDS,
} from '@dydxprotocol-indexer/base';
import {
kafkaConfigSchema,
Expand All @@ -40,6 +41,7 @@ export const configSchema = {
LOOPS_ORDERBOOK_INSTRUMENTATION: parseBoolean({ default: true }),
LOOPS_CANCEL_STALE_ORDERS: parseBoolean({ default: true }),
LOOPS_ENABLED_UPDATE_RESEARCH_ENVIRONMENT: parseBoolean({ default: false }),
LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS: parseBoolean({ default: false }),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The take-fast-sync-snapshot.ts file exists in the codebase, but the LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS flag is not being used within it. This suggests that the flag may not be implemented correctly to control the execution of the snapshot task as intended by the PR objectives. It is recommended to verify the implementation of this flag in the snapshot task logic.

Analysis chain

The addition of the LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS configuration flag is correct and aligns with the PR objectives. Ensure that this flag is used correctly throughout the codebase.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Verify that the new configuration flag is correctly used throughout the codebase.
rg --type js 'LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS'

Length of output: 53



Script:

#!/bin/bash
# Locate the take-fast-sync-snapshot.ts file and check for the usage of the configuration flag.
fd 'take-fast-sync-snapshot.ts' --exec rg 'LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS' {}

Length of output: 85



Script:

#!/bin/bash
# Verify the existence of the take-fast-sync-snapshot.ts file in the codebase.
fd 'take-fast-sync-snapshot.ts'

Length of output: 96

LOOPS_ENABLED_TRACK_LAG: parseBoolean({ default: false }),
LOOPS_ENABLED_REMOVE_OLD_ORDER_UPDATES: parseBoolean({ default: true }),
LOOPS_ENABLED_AGGREGATE_TRADING_REWARDS: parseBoolean({ default: true }),
Expand All @@ -66,6 +68,9 @@ export const configSchema = {
LOOPS_INTERVAL_MS_UPDATE_RESEARCH_ENVIRONMENT: parseInteger({
default: ONE_HOUR_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS: parseInteger({
default: FOUR_HOURS_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_UPDATE_COMPLIANCE_DATA: parseInteger({
default: FIVE_MINUTES_IN_MILLISECONDS,
}),
Expand Down Expand Up @@ -111,7 +116,10 @@ export const configSchema = {
// Update research environment
AWS_ACCOUNT_ID: parseString(),
AWS_REGION: parseString(),
S3_BUCKET_ARN: parseString(),
RESEARCH_SNAPSHOT_S3_BUCKET_ARN: parseString(),
FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN: parseString(),
FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX: parseString({ default: 'fast-sync' }),
EXPORT_FAST_SYNC_SNAPSHOTS_TO_S3: parseBoolean({ default: false }),
ECS_TASK_ROLE_ARN: parseString(),
KMS_KEY_ARN: parseString(),
RDS_INSTANCE_NAME: parseString(),
Expand Down
100 changes: 90 additions & 10 deletions indexer/services/roundtable/src/helpers/aws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,24 @@ enum ExportTaskStatus {
COMPLETE = 'complete',
}

const S3_BUCKET_NAME = config.S3_BUCKET_ARN.split(':::')[1];
export const S3_LOCATION_PREFIX = `s3://${S3_BUCKET_NAME}`;
export const RESEARCH_SNAPSHOT_S3_BUCKET_NAME = config.RESEARCH_SNAPSHOT_S3_BUCKET_ARN.split(':::')[1];
export const RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX = `s3://${RESEARCH_SNAPSHOT_S3_BUCKET_NAME}`;
export const FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME = config.FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN.split(':::')[1];

/**
* @description Get most recent snapshot identifier for an RDS database.
* @param rds - RDS client
* @param snapshotIdentifierPrefixInclude - Only include snapshots with snapshot identifier
* that starts with prefixInclude
* @param snapshotIdentifierPrefixExclude - Only include snapshots with snapshot identifier
* that does not start with prefixExclude
*/
// TODO(CLOB-672): Verify this function returns the most recent DB snapshot.
export async function getMostRecentDBSnapshotIdentifier(rds: RDS): Promise<string> {
export async function getMostRecentDBSnapshotIdentifier(
rds: RDS,
snapshotIdentifierPrefixInclude?: string,
snapshotIdentifierPrefixExclude?: string,
): Promise<string | undefined> {
const awsResponse: RDS.DBSnapshotMessage = await rds.describeDBSnapshots({
DBInstanceIdentifier: config.RDS_INSTANCE_NAME,
MaxRecords: 20, // this is the minimum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bug (noted in TODO(CLOB-672)) where if there are more than 20 DB snapshots for the RDS instance, this method won't return the latest DB snapshot. The fix (which wasn't implemented yet) is to page through all the snapshots and then get the latest one.
As such I don't think this will work correctly in any of the long-running test-nets (public testnet / mainnet).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Expand All @@ -33,21 +43,86 @@ export async function getMostRecentDBSnapshotIdentifier(rds: RDS): Promise<strin
throw Error(`No DB snapshots found with identifier: ${config.RDS_INSTANCE_NAME}`);
}

let snapshots: RDS.DBSnapshotList = awsResponse.DBSnapshots;
// Only include snapshots with snapshot identifier that starts with prefixInclude
if (snapshotIdentifierPrefixInclude !== undefined) {
snapshots = snapshots
.filter((snapshot) => snapshot.DBSnapshotIdentifier &&
snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixInclude),
);
}
if (snapshotIdentifierPrefixExclude !== undefined) {
snapshots = snapshots
.filter((snapshot) => snapshot.DBSnapshotIdentifier &&
!snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixExclude),
);
}

logger.info({
at: `${atStart}getMostRecentDBSnapshotIdentifier`,
message: 'Described snapshots for database',
mostRecentSnapshot: awsResponse.DBSnapshots[awsResponse.DBSnapshots.length - 1],
mostRecentSnapshot: snapshots[snapshots.length - 1],
});

return awsResponse.DBSnapshots[awsResponse.DBSnapshots.length - 1].DBSnapshotIdentifier!;
return snapshots[snapshots.length - 1]?.DBSnapshotIdentifier;
}

/**
* @description Create DB snapshot for an RDS database. Only returns when the
* snapshot is available.
*/
export async function createDBSnapshot(
rds: RDS,
snapshotIdentifier: string,
dbInstanceIdentifier: string,
): Promise<string> {
const params = {
DBInstanceIdentifier: dbInstanceIdentifier,
DBSnapshotIdentifier: snapshotIdentifier,
};

try {
await rds.createDBSnapshot(params).promise();
// Polling function to check snapshot status. Only return when the snapshot is available.
const waitForSnapshot = async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think you can use this waiter method/object that is part of the AWS sdk here rather than writing your own. Non-blocking though, if it's too complicated to use, no need to use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// eslint-disable-next-line no-constant-condition
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add some max wait time where the task breaks out of this loop, and some logs indicating how long the task is waiting for the snapshot at some X interval to help with debugging if this loops somehow never terminates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to use waiter in AWS SDK, that sets a max time limit (retries * delay)

const statusResponse = await rds.describeDBSnapshots(
{ DBSnapshotIdentifier: snapshotIdentifier },
).promise();
const snapshot = statusResponse.DBSnapshots![0];
if (snapshot.Status === 'available') {
return snapshot.DBSnapshotIdentifier!;
} else if (snapshot.Status === 'failed') {
throw Error(`Snapshot creation failed for identifier: ${snapshotIdentifier}`);
}

// Wait for 1 minute before checking again
await new Promise((resolve) => setTimeout(resolve, 60000));
}
};

return await waitForSnapshot();
} catch (error) {
logger.error({
at: `${atStart}createDBSnapshot`,
message: 'Failed to create DB snapshot',
error,
snapshotIdentifier,
});
throw error;
}
}

/**
* @description Check if an S3 Object already exists.
*/
export async function checkIfS3ObjectExists(s3: S3, s3Date: string): Promise<boolean> {
export async function checkIfS3ObjectExists(
s3: S3,
s3Date: string,
bucket: string,
): Promise<boolean> {
const at: string = `${atStart}checkIfS3ObjectExists`;
const bucket: string = S3_BUCKET_NAME;
const key: string = `${config.RDS_INSTANCE_NAME}-${s3Date}/export_info_${config.RDS_INSTANCE_NAME}-${s3Date}.json`;

logger.info({
Expand Down Expand Up @@ -143,12 +218,17 @@ export async function checkIfExportJobToS3IsOngoing(
export async function startExportTask(
rds: RDS,
rdsExportIdentifier: string,
bucket: string,
isAutomatedSnapshot: boolean,
): Promise<RDS.ExportTask> {
// TODO: Add validation
const sourceArnPrefix = `arn:aws:rds:${config.AWS_REGION}:${config.AWS_ACCOUNT_ID}:snapshot:rds:`;
let sourceArnPrefix: string = `arn:aws:rds:${config.AWS_REGION}:${config.AWS_ACCOUNT_ID}:snapshot:`;
if (isAutomatedSnapshot) {
sourceArnPrefix = sourceArnPrefix.concat('rds:');
}
const awsResponse: RDS.ExportTask = await rds.startExportTask({
ExportTaskIdentifier: rdsExportIdentifier,
S3BucketName: S3_BUCKET_NAME,
S3BucketName: bucket,
KmsKeyId: config.KMS_KEY_ARN,
IamRoleArn: config.ECS_TASK_ROLE_ARN,
SourceArn: `${sourceArnPrefix}${rdsExportIdentifier}`,
Expand Down Expand Up @@ -216,7 +296,7 @@ export async function startAthenaQuery(
Database: config.ATHENA_DATABASE_NAME,
},
ResultConfiguration: {
OutputLocation: `${S3_LOCATION_PREFIX}/output/${timestamp}`,
OutputLocation: `${RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX}/output/${timestamp}`,
},
WorkGroup: config.ATHENA_WORKING_GROUP,
}).promise();
Expand Down
4 changes: 2 additions & 2 deletions indexer/services/roundtable/src/helpers/sql.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { S3_LOCATION_PREFIX } from './aws';
import { RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX } from './aws';

export function castToTimestamp(column: string): string {
return `CAST("${column}" AS timestamp) as "${column}"`;
Expand All @@ -23,7 +23,7 @@ export function getExternalAthenaTableCreationStatement(
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '${S3_LOCATION_PREFIX}/${rdsExportIdentifier}/dydx/public.${tableName}'
LOCATION '${RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX}/${rdsExportIdentifier}/dydx/public.${tableName}'
TBLPROPERTIES ('has_encrypted_data'='false');
`;
}
Expand Down
9 changes: 9 additions & 0 deletions indexer/services/roundtable/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import marketUpdaterTask from './tasks/market-updater';
import orderbookInstrumentationTask from './tasks/orderbook-instrumentation';
import removeExpiredOrdersTask from './tasks/remove-expired-orders';
import removeOldOrderUpdatesTask from './tasks/remove-old-order-updates';
import takeFastSyncSnapshotTask from './tasks/take-fast-sync-snapshot';
import trackLag from './tasks/track-lag';
import updateComplianceDataTask from './tasks/update-compliance-data';
import updateResearchEnvironmentTask from './tasks/update-research-environment';
Expand Down Expand Up @@ -100,6 +101,14 @@ async function start(): Promise<void> {
);
}

if (config.LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS) {
startLoop(
takeFastSyncSnapshotTask,
'take_fast_sync_snapshot',
config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS,
);
}

startLoop(
() => updateComplianceDataTask(complianceProvider),
'update_compliance_data',
Expand Down
Loading
Loading