Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IND-552] add roundtable task to take fast sync Postgres snapshots every 4 hours #912

Merged
merged 15 commits into from
Jan 5, 2024
1 change: 1 addition & 0 deletions indexer/packages/base/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ export const ONE_MINUTE_IN_MILLISECONDS: number = 60 * ONE_SECOND_IN_MILLISECOND
export const FIVE_MINUTES_IN_MILLISECONDS: number = 5 * ONE_MINUTE_IN_MILLISECONDS;
export const TEN_MINUTES_IN_MILLISECONDS: number = 10 * ONE_MINUTE_IN_MILLISECONDS;
export const ONE_HOUR_IN_MILLISECONDS: number = 60 * ONE_MINUTE_IN_MILLISECONDS;
export const FOUR_HOURS_IN_MILLISECONDS: number = 4 * ONE_HOUR_IN_MILLISECONDS;
3 changes: 2 additions & 1 deletion indexer/services/roundtable/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ DB_PASSWORD=dydxserver123
DB_PORT=5436
AWS_ACCOUNT_ID=invalid-test-account-id
AWS_REGION=invalid-test-region
S3_BUCKET_ARN=invalid-test-bucket
RESEARCH_SNAPSHOT_S3_BUCKET_ARN=invalid-research-test-bucket
FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN=invalid-snapshot-test-bucket
ECS_TASK_ROLE_ARN=invalid-test-arn
KMS_KEY_ARN=invalid-kms-key-arn
RDS_INSTANCE_NAME=invalid-rds-instance-name
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import config from '../../src/config';
import { asMock } from '@dydxprotocol-indexer/dev';
import { createDBSnapshot, getMostRecentDBSnapshotIdentifier, startExportTask } from '../../src/helpers/aws';
import takeFastSyncSnapshotTask from '../../src/tasks/take-fast-sync-snapshot';
import { DateTime } from 'luxon';

jest.mock('../../src/helpers/aws');

describe('fast-sync-export-db-snapshot', () => {
const snapshotIdentifier: string = `${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-postgres-main-staging-2022-05-03-04-16`;
beforeAll(() => {
config.RDS_INSTANCE_NAME = 'postgres-main-staging';
});

beforeEach(() => {
jest.resetAllMocks();
asMock(getMostRecentDBSnapshotIdentifier).mockImplementation(
async () => Promise.resolve(snapshotIdentifier),
);
});

afterAll(jest.resetAllMocks);

it('Last snapshot was taken more than interval ago', async () => {
await takeFastSyncSnapshotTask();

expect(createDBSnapshot).toHaveBeenCalled();
expect(startExportTask).toHaveBeenCalled();
});

it('Last snapshot was taken less than interval ago', async () => {
const timestamp: string = DateTime.utc().minus({ minutes: 1 }).toFormat('yyyy-MM-dd-HH-mm');
asMock(getMostRecentDBSnapshotIdentifier).mockImplementation(
async () => Promise.resolve(`${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-postgres-main-staging-${timestamp}`),
);

await takeFastSyncSnapshotTask();

expect(createDBSnapshot).not.toHaveBeenCalled();
expect(startExportTask).not.toHaveBeenCalled();
});
});
9 changes: 8 additions & 1 deletion indexer/services/roundtable/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
ONE_HOUR_IN_MILLISECONDS,
ONE_SECOND_IN_MILLISECONDS,
TEN_SECONDS_IN_MILLISECONDS,
FOUR_HOURS_IN_MILLISECONDS,
} from '@dydxprotocol-indexer/base';
import {
kafkaConfigSchema,
Expand All @@ -40,6 +41,7 @@ export const configSchema = {
LOOPS_ORDERBOOK_INSTRUMENTATION: parseBoolean({ default: true }),
LOOPS_CANCEL_STALE_ORDERS: parseBoolean({ default: true }),
LOOPS_ENABLED_UPDATE_RESEARCH_ENVIRONMENT: parseBoolean({ default: false }),
LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS: parseBoolean({ default: false }),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The take-fast-sync-snapshot.ts file exists in the codebase, but the LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS flag is not being used within it. This suggests that the flag may not be implemented correctly to control the execution of the snapshot task as intended by the PR objectives. It is recommended to verify the implementation of this flag in the snapshot task logic.

Analysis chain

The addition of the LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS configuration flag is correct and aligns with the PR objectives. Ensure that this flag is used correctly throughout the codebase.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Verify that the new configuration flag is correctly used throughout the codebase.
rg --type js 'LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS'

Length of output: 53



Script:

#!/bin/bash
# Locate the take-fast-sync-snapshot.ts file and check for the usage of the configuration flag.
fd 'take-fast-sync-snapshot.ts' --exec rg 'LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS' {}

Length of output: 85



Script:

#!/bin/bash
# Verify the existence of the take-fast-sync-snapshot.ts file in the codebase.
fd 'take-fast-sync-snapshot.ts'

Length of output: 96

LOOPS_ENABLED_TRACK_LAG: parseBoolean({ default: false }),
LOOPS_ENABLED_REMOVE_OLD_ORDER_UPDATES: parseBoolean({ default: true }),
LOOPS_ENABLED_AGGREGATE_TRADING_REWARDS: parseBoolean({ default: true }),
Expand All @@ -66,6 +68,9 @@ export const configSchema = {
LOOPS_INTERVAL_MS_UPDATE_RESEARCH_ENVIRONMENT: parseInteger({
default: ONE_HOUR_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS: parseInteger({
default: FOUR_HOURS_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_UPDATE_COMPLIANCE_DATA: parseInteger({
default: FIVE_MINUTES_IN_MILLISECONDS,
}),
Expand Down Expand Up @@ -111,7 +116,9 @@ export const configSchema = {
// Update research environment
AWS_ACCOUNT_ID: parseString(),
AWS_REGION: parseString(),
S3_BUCKET_ARN: parseString(),
RESEARCH_SNAPSHOT_S3_BUCKET_ARN: parseString(),
FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN: parseString(),
FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX: parseString({ default: 'fast-sync' }),
ECS_TASK_ROLE_ARN: parseString(),
KMS_KEY_ARN: parseString(),
RDS_INSTANCE_NAME: parseString(),
Expand Down
67 changes: 58 additions & 9 deletions indexer/services/roundtable/src/helpers/aws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,24 @@ enum ExportTaskStatus {
COMPLETE = 'complete',
}

const S3_BUCKET_NAME = config.S3_BUCKET_ARN.split(':::')[1];
export const S3_LOCATION_PREFIX = `s3://${S3_BUCKET_NAME}`;
export const RESEARCH_SNAPSHOT_S3_BUCKET_NAME = config.RESEARCH_SNAPSHOT_S3_BUCKET_ARN.split(':::')[1];
export const RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX = `s3://${RESEARCH_SNAPSHOT_S3_BUCKET_NAME}`;
export const FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME = config.FAST_SYNC_SNAPSHOT_S3_BUCKET_ARN.split(':::')[1];

/**
* @description Get most recent snapshot identifier for an RDS database.
* @param rds - RDS client
* @param snapshotIdentifierPrefixInclude - Only include snapshots with snapshot identifier
* that starts with prefixInclude
* @param snapshotIdentifierPrefixExclude - Only include snapshots with snapshot identifier
* that does not start with prefixExclude
*/
// TODO(CLOB-672): Verify this function returns the most recent DB snapshot.
export async function getMostRecentDBSnapshotIdentifier(rds: RDS): Promise<string> {
export async function getMostRecentDBSnapshotIdentifier(
rds: RDS,
snapshotIdentifierPrefixInclude?: string,
snapshotIdentifierPrefixExclude?: string,
): Promise<string> {
const awsResponse: RDS.DBSnapshotMessage = await rds.describeDBSnapshots({
DBInstanceIdentifier: config.RDS_INSTANCE_NAME,
MaxRecords: 20, // this is the minimum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bug (noted in TODO(CLOB-672)) where if there are more than 20 DB snapshots for the RDS instance, this method won't return the latest DB snapshot. The fix (which wasn't implemented yet) is to page through all the snapshots and then get the latest one.
As such I don't think this will work correctly in any of the long-running test-nets (public testnet / mainnet).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Expand All @@ -33,21 +43,59 @@ export async function getMostRecentDBSnapshotIdentifier(rds: RDS): Promise<strin
throw Error(`No DB snapshots found with identifier: ${config.RDS_INSTANCE_NAME}`);
}

let snapshots: RDS.DBSnapshotList = awsResponse.DBSnapshots;
// Only include snapshots with snapshot identifier that starts with prefixInclude
if (snapshotIdentifierPrefixInclude !== undefined) {
snapshots = awsResponse.DBSnapshots
.filter((snapshot) => snapshot.DBSnapshotIdentifier &&
snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixInclude),
);
}
if (snapshotIdentifierPrefixExclude !== undefined) {
snapshots = awsResponse.DBSnapshots
.filter((snapshot) => snapshot.DBSnapshotIdentifier &&
!snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixExclude),
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filtering logic for snapshotIdentifierPrefixInclude and snapshotIdentifierPrefixExclude uses the same awsResponse.DBSnapshots list, which could lead to incorrect filtering if both includes and excludes are provided. This should be a sequential filter, not parallel.

- snapshots = awsResponse.DBSnapshots
+ snapshots = snapshots

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
let snapshots: RDS.DBSnapshotList = awsResponse.DBSnapshots;
// Only include snapshots with snapshot identifier that starts with prefixInclude
if (snapshotIdentifierPrefixInclude !== undefined) {
snapshots = awsResponse.DBSnapshots
.filter((snapshot) => snapshot.DBSnapshotIdentifier &&
snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixInclude),
);
}
if (snapshotIdentifierPrefixExclude !== undefined) {
snapshots = awsResponse.DBSnapshots
.filter((snapshot) => snapshot.DBSnapshotIdentifier &&
!snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixExclude),
);
}
let snapshots: RDS.DBSnapshotList = awsResponse.DBSnapshots;
// Only include snapshots with snapshot identifier that starts with prefixInclude
if (snapshotIdentifierPrefixInclude !== undefined) {
snapshots = snapshots
.filter((snapshot) => snapshot.DBSnapshotIdentifier &&
snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixInclude),
);
}
if (snapshotIdentifierPrefixExclude !== undefined) {
snapshots = snapshots
.filter((snapshot) => snapshot.DBSnapshotIdentifier &&
!snapshot.DBSnapshotIdentifier.startsWith(snapshotIdentifierPrefixExclude),
);
}


logger.info({
at: `${atStart}getMostRecentDBSnapshotIdentifier`,
message: 'Described snapshots for database',
mostRecentSnapshot: awsResponse.DBSnapshots[awsResponse.DBSnapshots.length - 1],
mostRecentSnapshot: snapshots[snapshots.length - 1],
});

return awsResponse.DBSnapshots[awsResponse.DBSnapshots.length - 1].DBSnapshotIdentifier!;
return snapshots[snapshots.length - 1].DBSnapshotIdentifier!;
}

/**
* @description Create DB snapshot for an RDS database.
*/
export async function createDBSnapshot(
rds: RDS,
snapshotIdentifier: string,
dbInstanceIdentifier: string,
): Promise<string> {
const params = {
DBInstanceIdentifier: dbInstanceIdentifier,
DBSnapshotIdentifier: snapshotIdentifier,
};

const awsResponse: RDS.CreateDBSnapshotResult = await rds.createDBSnapshot(params).promise();
if (awsResponse.DBSnapshot === undefined) {
throw Error(`No DB snapshot was created with identifier: ${snapshotIdentifier}`);
}
return awsResponse.DBSnapshot.DBSnapshotIdentifier!;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The createDBSnapshot function should have error handling for the createDBSnapshot call to RDS, similar to the error handling in the checkIfS3ObjectExists function.

+ try {
    const awsResponse: RDS.CreateDBSnapshotResult = await rds.createDBSnapshot(params).promise();
+ } catch (error) {
+   logger.error({
+     at: `${atStart}createDBSnapshot`,
+     message: 'Failed to create DB snapshot',
+     error,
+     snapshotIdentifier,
+   });
+   throw error;
+ }

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
export async function createDBSnapshot(
rds: RDS,
snapshotIdentifier: string,
dbInstanceIdentifier: string,
): Promise<string> {
const params = {
DBInstanceIdentifier: dbInstanceIdentifier,
DBSnapshotIdentifier: snapshotIdentifier,
};
const awsResponse: RDS.CreateDBSnapshotResult = await rds.createDBSnapshot(params).promise();
if (awsResponse.DBSnapshot === undefined) {
throw Error(`No DB snapshot was created with identifier: ${snapshotIdentifier}`);
}
return awsResponse.DBSnapshot.DBSnapshotIdentifier!;
export async function createDBSnapshot(
rds: RDS,
snapshotIdentifier: string,
dbInstanceIdentifier: string,
): Promise<string> {
const params = {
DBInstanceIdentifier: dbInstanceIdentifier,
DBSnapshotIdentifier: snapshotIdentifier,
};
try {
const awsResponse: RDS.CreateDBSnapshotResult = await rds.createDBSnapshot(params).promise();
} catch (error) {
logger.error({
at: `${atStart}createDBSnapshot`,
message: 'Failed to create DB snapshot',
error,
snapshotIdentifier,
});
throw error;
}
if (awsResponse.DBSnapshot === undefined) {
throw Error(`No DB snapshot was created with identifier: ${snapshotIdentifier}`);
}
return awsResponse.DBSnapshot.DBSnapshotIdentifier!;

}

/**
* @description Check if an S3 Object already exists.
*/
export async function checkIfS3ObjectExists(s3: S3, s3Date: string): Promise<boolean> {
export async function checkIfS3ObjectExists(
s3: S3,
s3Date: string,
bucket: string,
): Promise<boolean> {
const at: string = `${atStart}checkIfS3ObjectExists`;
const bucket: string = S3_BUCKET_NAME;
const key: string = `${config.RDS_INSTANCE_NAME}-${s3Date}/export_info_${config.RDS_INSTANCE_NAME}-${s3Date}.json`;

logger.info({
Expand Down Expand Up @@ -143,12 +191,13 @@ export async function checkIfExportJobToS3IsOngoing(
export async function startExportTask(
rds: RDS,
rdsExportIdentifier: string,
bucket: string,
): Promise<RDS.ExportTask> {
// TODO: Add validation
const sourceArnPrefix = `arn:aws:rds:${config.AWS_REGION}:${config.AWS_ACCOUNT_ID}:snapshot:rds:`;
const awsResponse: RDS.ExportTask = await rds.startExportTask({
ExportTaskIdentifier: rdsExportIdentifier,
S3BucketName: S3_BUCKET_NAME,
S3BucketName: bucket,
KmsKeyId: config.KMS_KEY_ARN,
IamRoleArn: config.ECS_TASK_ROLE_ARN,
SourceArn: `${sourceArnPrefix}${rdsExportIdentifier}`,
Expand Down Expand Up @@ -216,7 +265,7 @@ export async function startAthenaQuery(
Database: config.ATHENA_DATABASE_NAME,
},
ResultConfiguration: {
OutputLocation: `${S3_LOCATION_PREFIX}/output/${timestamp}`,
OutputLocation: `${RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX}/output/${timestamp}`,
},
WorkGroup: config.ATHENA_WORKING_GROUP,
}).promise();
Expand Down
4 changes: 2 additions & 2 deletions indexer/services/roundtable/src/helpers/sql.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { S3_LOCATION_PREFIX } from './aws';
import { RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX } from './aws';

export function castToTimestamp(column: string): string {
return `CAST("${column}" AS timestamp) as "${column}"`;
Expand All @@ -23,7 +23,7 @@ export function getExternalAthenaTableCreationStatement(
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '${S3_LOCATION_PREFIX}/${rdsExportIdentifier}/dydx/public.${tableName}'
LOCATION '${RESEARCH_SNAPSHOT_S3_LOCATION_PREFIX}/${rdsExportIdentifier}/dydx/public.${tableName}'
TBLPROPERTIES ('has_encrypted_data'='false');
`;
}
Expand Down
9 changes: 9 additions & 0 deletions indexer/services/roundtable/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import marketUpdaterTask from './tasks/market-updater';
import orderbookInstrumentationTask from './tasks/orderbook-instrumentation';
import removeExpiredOrdersTask from './tasks/remove-expired-orders';
import removeOldOrderUpdatesTask from './tasks/remove-old-order-updates';
import takeFastSyncSnapshotTask from './tasks/take-fast-sync-snapshot';
import trackLag from './tasks/track-lag';
import updateComplianceDataTask from './tasks/update-compliance-data';
import updateResearchEnvironmentTask from './tasks/update-research-environment';
Expand Down Expand Up @@ -100,6 +101,14 @@ async function start(): Promise<void> {
);
}

if (config.LOOPS_ENABLED_TAKE_FAST_SYNC_SNAPSHOTS) {
startLoop(
takeFastSyncSnapshotTask,
'take_fast_sync_snapshot',
config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS,
);
}

startLoop(
() => updateComplianceDataTask(complianceProvider),
'update_compliance_data',
Expand Down
106 changes: 106 additions & 0 deletions indexer/services/roundtable/src/tasks/take-fast-sync-snapshot.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import { InfoObject, logger, stats } from '@dydxprotocol-indexer/base';
import RDS from 'aws-sdk/clients/rds';
import { DateTime } from 'luxon';

import config from '../config';
import {
createDBSnapshot,
FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME,
getMostRecentDBSnapshotIdentifier,
startExportTask,
} from '../helpers/aws';

const statStart: string = `${config.SERVICE_NAME}.fast_sync_export_db_snapshot`;

/**
* Checks if the difference between two dates is less than a given interval.
*
* @param startDate
* @param endDate
* @param intervalMs
*/
function isDifferenceLessThanInterval(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move this under runTask so that the top level function is at the top of the file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

startDate: string,
endDate: string,
intervalMs: number,
): boolean {
const parseDateString = (dateStr: string): Date => {
const [year, month, day, hour, minute] = dateStr.split('-').map(Number);
return new Date(year, month, day, hour, minute);
};

// Parse the date strings
const parsedDate1 = parseDateString(startDate);
const parsedDate2 = parseDateString(endDate);

// Calculate the difference in milliseconds
const differenceInMilliseconds = Math.abs(parsedDate1.getTime() - parsedDate2.getTime());

// Compare with the interval
return differenceInMilliseconds < intervalMs;
}
Comment on lines +54 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using a well-tested date parsing library like luxon for the isDifferenceLessThanInterval function to reduce the risk of date parsing errors.

- const parseDateString = (dateStr: string): Date => {
-   const [year, month, day, hour, minute] = dateStr.split('-').map(Number);
-   return new Date(year, month, day, hour, minute);
- };
+ const parseDateString = (dateStr: string): Date => DateTime.fromFormat(dateStr, 'yyyy-MM-dd-HH-mm').toJSDate();

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
/**
* Checks if the difference between two dates is less than a given interval.
*
* @param startDate
* @param endDate
* @param intervalMs
*/
function isDifferenceLessThanInterval(
startDate: string,
endDate: string,
intervalMs: number,
): boolean {
const parseDateString = (dateStr: string): Date => {
const [year, month, day, hour, minute] = dateStr.split('-').map(Number);
return new Date(year, month, day, hour, minute);
};
// Parse the date strings
const parsedDate1 = parseDateString(startDate);
const parsedDate2 = parseDateString(endDate);
// Calculate the difference in milliseconds
const differenceInMilliseconds = Math.abs(parsedDate1.getTime() - parsedDate2.getTime());
// Compare with the interval
return differenceInMilliseconds < intervalMs;
}
/**
* Checks if the difference between two dates is less than a given interval.
*
* @param startDate
* @param endDate
* @param intervalMs
*/
function isDifferenceLessThanInterval(
startDate: string,
endDate: string,
intervalMs: number,
): boolean {
const parseDateString = (dateStr: string): Date => DateTime.fromFormat(dateStr, 'yyyy-MM-dd-HH-mm').toJSDate();
// Parse the date strings
const parsedDate1 = parseDateString(startDate);
const parsedDate2 = parseDateString(endDate);
// Calculate the difference in milliseconds
const differenceInMilliseconds = Math.abs(parsedDate1.getTime() - parsedDate2.getTime());
// Compare with the interval
return differenceInMilliseconds < intervalMs;
}


export default async function runTask(): Promise<void> {
const at: string = 'fast-sync-export-db-snapshot#runTask';
logger.info({ at, message: 'Starting task.' });

const rds: RDS = new RDS();

const dateString: string = DateTime.utc().toFormat('yyyy-MM-dd-HH-mm');
const rdsExportIdentifier: string = `${config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX}-${config.RDS_INSTANCE_NAME}-${dateString}`;
// check the time of the last snapshot
const lastSnapshotIdentifier: string = await getMostRecentDBSnapshotIdentifier(
rds,
config.FAST_SYNC_SNAPSHOT_IDENTIFIER_PREFIX,
);
const s3Date: string = lastSnapshotIdentifier.split(config.RDS_INSTANCE_NAME)[1].slice(1);
if (
isDifferenceLessThanInterval(
s3Date,
dateString,
config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS,
)
) {
logger.info({
at,
message: 'Last fast sync db snapshot was taken less than the interval ago',
interval: config.LOOPS_INTERVAL_MS_TAKE_FAST_SYNC_SNAPSHOTS,
});
return;
}
// Create the DB snapshot
await createDBSnapshot(rds, rdsExportIdentifier, config.RDS_INSTANCE_NAME);

// start S3 Export Job.
const startExport: number = Date.now();
try {
const exportData: RDS.ExportTask = await startExportTask(
rds,
rdsExportIdentifier,
FAST_SYNC_SNAPSHOT_S3_BUCKET_NAME,
);

logger.info({
at,
message: 'Started an export task',
exportData,
});
} catch (error) { // TODO handle this by finding the most recent snapshot earlier
const message: InfoObject = {
at,
message: 'export to S3 failed',
error,
};

if (error.name === 'DBSnapshotNotFound') {
stats.increment(`${statStart}.no_s3_snapshot`, 1);

logger.info(message);
return;
}

logger.error(message);
} finally {
stats.timing(`${statStart}.rdsSnapshotExport`, Date.now() - startExport);
}
}
Loading