Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IND-552] add roundtable task to take fast sync Postgres snapshots every 4 hours #912

Merged
merged 15 commits into from
Jan 5, 2024
1 change: 1 addition & 0 deletions indexer/packages/base/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ export const ONE_MINUTE_IN_MILLISECONDS: number = 60 * ONE_SECOND_IN_MILLISECOND
export const FIVE_MINUTES_IN_MILLISECONDS: number = 5 * ONE_MINUTE_IN_MILLISECONDS;
export const TEN_MINUTES_IN_MILLISECONDS: number = 10 * ONE_MINUTE_IN_MILLISECONDS;
export const ONE_HOUR_IN_MILLISECONDS: number = 60 * ONE_MINUTE_IN_MILLISECONDS;
export const FOUR_HOURS_IN_MILLISECONDS: number = 4 * ONE_HOUR_IN_MILLISECONDS;
3 changes: 2 additions & 1 deletion indexer/services/roundtable/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ DB_PASSWORD=dydxserver123
DB_PORT=5436
AWS_ACCOUNT_ID=invalid-test-account-id
AWS_REGION=invalid-test-region
S3_BUCKET_ARN=invalid-test-bucket
RESEARCH_SNAPSHOT_S3_BUCKET_ARN=invalid-research-test-bucket
FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN=invalid-snapshot-test-bucket
ECS_TASK_ROLE_ARN=invalid-test-arn
KMS_KEY_ARN=invalid-kms-key-arn
RDS_INSTANCE_NAME=invalid-rds-instance-name
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import config from '../../src/config';
import { asMock } from '@dydxprotocol-indexer/dev';
import {
checkIfExportJobToS3IsOngoing,
checkIfS3ObjectExists,
getMostRecentDBSnapshotIdentifier,
startExportTask,
} from '../../src/helpers/aws';
import takeFastSyncSnapshotTask from '../../src/tasks/take-fast-sync-snapshot';

jest.mock('../../src/helpers/aws');

describe('fast-sync-export-db-snapshot', () => {
beforeAll(() => {
config.RDS_INSTANCE_NAME = 'postgres-main-staging';
});

beforeEach(() => {
jest.resetAllMocks();
asMock(getMostRecentDBSnapshotIdentifier).mockImplementation(async () => Promise.resolve('postgres-main-staging-2022-05-03-04-16'));
});

afterAll(jest.resetAllMocks);

it('s3Object exists', async () => {
asMock(checkIfS3ObjectExists).mockImplementation(async () => Promise.resolve(true));

await takeFastSyncSnapshotTask();

expect(checkIfExportJobToS3IsOngoing).not.toHaveBeenCalled();
expect(startExportTask).not.toHaveBeenCalled();
});

it('export job in progress', async () => {
asMock(checkIfExportJobToS3IsOngoing).mockImplementation(
async () => Promise.resolve(true));

await takeFastSyncSnapshotTask();

expect(startExportTask).not.toHaveBeenCalled();
});

it('start export job', async () => {
await takeFastSyncSnapshotTask();

expect(startExportTask).toHaveBeenCalled();
});
});
8 changes: 7 additions & 1 deletion indexer/services/roundtable/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
ONE_HOUR_IN_MILLISECONDS,
ONE_SECOND_IN_MILLISECONDS,
TEN_SECONDS_IN_MILLISECONDS,
FOUR_HOURS_IN_MILLISECONDS,
} from '@dydxprotocol-indexer/base';
import {
kafkaConfigSchema,
Expand All @@ -40,6 +41,7 @@ export const configSchema = {
LOOPS_ORDERBOOK_INSTRUMENTATION: parseBoolean({ default: true }),
LOOPS_CANCEL_STALE_ORDERS: parseBoolean({ default: true }),
LOOPS_ENABLED_UPDATE_RESEARCH_ENVIRONMENT: parseBoolean({ default: false }),
LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS: parseBoolean({ default: false }),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The take-fast-sync-snapshot.ts file exists in the codebase, but the LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS flag is not being used within it. This suggests that the flag may not be implemented correctly to control the execution of the snapshot task as intended by the PR objectives. It is recommended to verify the implementation of this flag in the snapshot task logic.

Analysis chain

The addition of the LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS configuration flag is correct and aligns with the PR objectives. Ensure that this flag is used correctly throughout the codebase.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Verify that the new configuration flag is correctly used throughout the codebase.
rg --type js 'LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS'

Length of output: 53



Script:

#!/bin/bash
# Locate the take-fast-sync-snapshot.ts file and check for the usage of the configuration flag.
fd 'take-fast-sync-snapshot.ts' --exec rg 'LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS' {}

Length of output: 85



Script:

#!/bin/bash
# Verify the existence of the take-fast-sync-snapshot.ts file in the codebase.
fd 'take-fast-sync-snapshot.ts'

Length of output: 96

LOOPS_ENABLED_TRACK_LAG: parseBoolean({ default: false }),
LOOPS_ENABLED_REMOVE_OLD_ORDER_UPDATES: parseBoolean({ default: true }),

Expand All @@ -65,6 +67,9 @@ export const configSchema = {
LOOPS_INTERVAL_MS_UPDATE_RESEARCH_ENVIRONMENT: parseInteger({
default: ONE_HOUR_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS: parseInteger({
default: FOUR_HOURS_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_UPDATE_COMPLIANCE_DATA: parseInteger({
default: FIVE_MINUTES_IN_MILLISECONDS,
}),
Expand Down Expand Up @@ -107,7 +112,8 @@ export const configSchema = {
// Update research environment
AWS_ACCOUNT_ID: parseString(),
AWS_REGION: parseString(),
S3_BUCKET_ARN: parseString(),
RESEARCH_SNAPSHOT_S3_BUCKET_ARN: parseString(),
FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN: parseString(),
ECS_TASK_ROLE_ARN: parseString(),
KMS_KEY_ARN: parseString(),
RDS_INSTANCE_NAME: parseString(),
Expand Down
17 changes: 11 additions & 6 deletions indexer/services/roundtable/src/helpers/aws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ enum ExportTaskStatus {
COMPLETE = 'complete',
}

const S3_BUCKET_NAME = config.S3_BUCKET_ARN.split(':::')[1];
export const S3_LOCATION_PREFIX = `s3://${S3_BUCKET_NAME}`;
export const RESEARCH_SNAPSHOT_S3_BUCKET_NAME = config.RESEARCH_SNAPSHOT_S3_BUCKET_ARN.split(':::')[1];
export const RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX = `s3://${RESEARCH_SNAPSHOT_S3_BUCKET_NAME}`;
export const FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME = config.FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN.split(':::')[1];

/**
* @description Get most recent snapshot identifier for an RDS database.
Expand Down Expand Up @@ -45,9 +46,12 @@ export async function getMostRecentDBSnapshotIdentifier(rds: RDS): Promise<strin
/**
* @description Check if an S3 Object already exists.
*/
export async function checkIfS3ObjectExists(s3: S3, s3Date: string): Promise<boolean> {
export async function checkIfS3ObjectExists(
s3: S3,
s3Date: string,
bucket: string,
): Promise<boolean> {
const at: string = `${atStart}checkIfS3ObjectExists`;
const bucket: string = S3_BUCKET_NAME;
const key: string = `${config.RDS_INSTANCE_NAME}-${s3Date}/export_info_${config.RDS_INSTANCE_NAME}-${s3Date}.json`;

logger.info({
Expand Down Expand Up @@ -143,12 +147,13 @@ export async function checkIfExportJobToS3IsOngoing(
export async function startExportTask(
rds: RDS,
rdsExportIdentifier: string,
bucket: string,
): Promise<RDS.ExportTask> {
// TODO: Add validation
const sourceArnPrefix = `arn:aws:rds:${config.AWS_REGION}:${config.AWS_ACCOUNT_ID}:snapshot:rds:`;
const awsResponse: RDS.ExportTask = await rds.startExportTask({
ExportTaskIdentifier: rdsExportIdentifier,
S3BucketName: S3_BUCKET_NAME,
S3BucketName: bucket,
KmsKeyId: config.KMS_KEY_ARN,
IamRoleArn: config.ECS_TASK_ROLE_ARN,
SourceArn: `${sourceArnPrefix}${rdsExportIdentifier}`,
Expand Down Expand Up @@ -216,7 +221,7 @@ export async function startAthenaQuery(
Database: config.ATHENA_DATABASE_NAME,
},
ResultConfiguration: {
OutputLocation: `${S3_LOCATION_PREFIX}/output/${timestamp}`,
OutputLocation: `${RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX}/output/${timestamp}`,
},
WorkGroup: config.ATHENA_WORKING_GROUP,
}).promise();
Expand Down
4 changes: 2 additions & 2 deletions indexer/services/roundtable/src/helpers/sql.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { S3_LOCATION_PREFIX } from './aws';
import { RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX } from './aws';

export function castToTimestamp(column: string): string {
return `CAST("${column}" AS timestamp) as "${column}"`;
Expand All @@ -23,7 +23,7 @@ export function getExternalAthenaTableCreationStatement(
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '${S3_LOCATION_PREFIX}/${rdsExportIdentifier}/dydx/public.${tableName}'
LOCATION '${RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX}/${rdsExportIdentifier}/dydx/public.${tableName}'
TBLPROPERTIES ('has_encrypted_data'='false');
`;
}
Expand Down
9 changes: 9 additions & 0 deletions indexer/services/roundtable/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
import cancelStaleOrdersTask from './tasks/cancel-stale-orders';
import createPnlTicksTask from './tasks/create-pnl-ticks';
import deleteZeroPriceLevelsTask from './tasks/delete-zero-price-levels';
import takeFastSyncSnapshotTask from './tasks/take-fast-sync-snapshot';
import marketUpdaterTask from './tasks/market-updater';
import orderbookInstrumentationTask from './tasks/orderbook-instrumentation';
import removeExpiredOrdersTask from './tasks/remove-expired-orders';
Expand Down Expand Up @@ -99,6 +100,14 @@ async function start(): Promise<void> {
);
}

if (config.LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS) {
startLoop(
takeFastSyncSnapshotTask,
'take_fast_sync_snapshot',
config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS,
);
}

startLoop(
() => updateComplianceDataTask(complianceProvider),
'update_compliance_data',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { InfoObject, logger, stats } from '@dydxprotocol-indexer/base';
import RDS from 'aws-sdk/clients/rds';
import S3 from 'aws-sdk/clients/s3';
import { DateTime } from 'luxon';

import config from '../config';
import {
checkIfExportJobToS3IsOngoing,
checkIfS3ObjectExists,
FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME,
getMostRecentDBSnapshotIdentifier,
startExportTask,
} from '../helpers/aws';

const statStart: string = `${config.SERVICE_NAME}.fast_sync_export_db_snapshot`;

export default async function runTask(): Promise<void> {
const at: string = 'fast-sync-export-db-snapshot#runTask';

const rds: RDS = new RDS();

// get most recent rds snapshot
const startDescribe: number = Date.now();
const dateString: string = DateTime.utc().toFormat('yyyy-MM-dd');
const mostRecentSnapshot: string = await getMostRecentDBSnapshotIdentifier(rds);
stats.timing(`${statStart}.describe_rds_snapshots`, Date.now() - startDescribe);

// dev example: rds:dev-indexer-apne1-db-2023-06-25-18-34
const s3Date: string = mostRecentSnapshot.split(config.RDS_INSTANCE_NAME)[1].slice(1);
const s3: S3 = new S3();

// check if s3 object exists
const startS3Check: number = Date.now();
const s3ObjectExists: boolean = await checkIfS3ObjectExists(
s3,
s3Date,
FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME,
);
stats.timing(`${statStart}.checkS3Object`, Date.now() - startS3Check);

const rdsExportIdentifier: string = `${config.RDS_INSTANCE_NAME}-fast-sync-${s3Date}`;

// If the s3 object exists, return
if (s3ObjectExists) {
logger.info({
at,
dateString,
message: 'S3 object exists.',
});
return;
}

// if we haven't created the object, check if it is being created
const rdsExportCheck: number = Date.now();
const exportJobOngoing: boolean = await checkIfExportJobToS3IsOngoing(rds, rdsExportIdentifier);
stats.timing(`${statStart}.checkRdsExport`, Date.now() - rdsExportCheck);

if (exportJobOngoing) {
logger.info({
at,
dateString,
message: 'Will wait for export job to finish',
});
return;
}
// start Export Job if S3 Object does not exist
const startExport: number = Date.now();
try {
const exportData: RDS.ExportTask = await startExportTask(
rds,
rdsExportIdentifier,
FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME,
);

logger.info({
at,
message: 'Started an export task',
exportData,
});
} catch (error) { // TODO handle this by finding the most recent snapshot earlier
const message: InfoObject = {
at,
message: 'export to S3 failed',
error,
};

if (error.name === 'DBSnapshotNotFound') {
stats.increment(`${statStart}.no_s3_snapshot`, 1);

logger.info(message);
return;
}

logger.error(message);
} finally {
stats.timing(`${statStart}.rdsSnapshotExport`, Date.now() - startExport);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
checkIfS3ObjectExists,
getMostRecentDBSnapshotIdentifier,
startExportTask,
startAthenaQuery,
startAthenaQuery, RESEARCH_SNAPSHOT_S3_BUCKET_NAME,
} from '../helpers/aws';
import { AthenaTableDDLQueries } from '../helpers/types';
import * as athenaAssetPositions from '../lib/athena-ddl-tables/asset_positions';
Expand Down Expand Up @@ -75,10 +75,14 @@ export default async function runTask(): Promise<void> {

// check if s3 object exists
const startS3Check: number = Date.now();
const s3ObjectExists: boolean = await checkIfS3ObjectExists(s3, s3Date);
const s3ObjectExists: boolean = await checkIfS3ObjectExists(
s3,
s3Date,
RESEARCH_SNAPSHOT_S3_BUCKET_NAME,
);
stats.timing(`${statStart}.checkS3Object`, Date.now() - startS3Check);

const rdsExportIdentifier: string = `${config.RDS_INSTANCE_NAME}-${s3Date}`;
const rdsExportIdentifier: string = `${config.RDS_INSTANCE_NAME}-research-${s3Date}`;

// If the s3 object exists, attempt to add Athena tables or if we are skipping for test purposes
if (s3ObjectExists || config.SKIP_TO_ATHENA_TABLE_WRITING) {
Expand Down Expand Up @@ -110,7 +114,11 @@ export default async function runTask(): Promise<void> {
// start Export Job if S3 Object does not exist
const startExport: number = Date.now();
try {
const exportData: RDS.ExportTask = await startExportTask(rds, rdsExportIdentifier);
const exportData: RDS.ExportTask = await startExportTask(
rds,
rdsExportIdentifier,
RESEARCH_SNAPSHOT_S3_BUCKET_NAME,
);

logger.info({
at,
Expand Down
Loading