Skip to content

Commit

Permalink
Update compliance roundtable batch size and add stale compliance metr…
Browse files Browse the repository at this point in the history
…ics (#2131)
  • Loading branch information
jerryfan01234 authored Aug 22, 2024
1 parent 5f0d372 commit 372f645
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
ComplianceReason,
} from '@dydxprotocol-indexer/postgres';
import updateComplianceDataTask from '../../src/tasks/update-compliance-data';
import { logger, stats } from '@dydxprotocol-indexer/base';
import { STATS_NO_SAMPLING, logger, stats } from '@dydxprotocol-indexer/base';
import _ from 'lodash';
import config from '../../src/config';
import { ClientAndProvider } from '../../src/helpers/compliance-clients';
Expand Down Expand Up @@ -73,6 +73,8 @@ describe('update-compliance-data', () => {
addressesScreened: 0,
upserted: 0,
statusUpserted: 0,
activeAddressesWithStaleCompliance: 0,
inactiveAddressesWithStaleCompliance: 0,
},
mockProvider.provider,
);
Expand All @@ -95,6 +97,8 @@ describe('update-compliance-data', () => {
addressesScreened: 0,
upserted: 0,
statusUpserted: 0,
activeAddressesWithStaleCompliance: 0,
inactiveAddressesWithStaleCompliance: 0,
},
mockProvider.provider,
);
Expand Down Expand Up @@ -122,6 +126,8 @@ describe('update-compliance-data', () => {
addressesScreened: 0,
upserted: 0,
statusUpserted: 0,
activeAddressesWithStaleCompliance: 0,
inactiveAddressesWithStaleCompliance: 0,
},
mockProvider.provider,
);
Expand Down Expand Up @@ -152,6 +158,8 @@ describe('update-compliance-data', () => {
addressesScreened: 0,
upserted: 0,
statusUpserted: 0,
activeAddressesWithStaleCompliance: 0,
inactiveAddressesWithStaleCompliance: 0,
},
mockProvider.provider,
);
Expand Down Expand Up @@ -197,6 +205,8 @@ describe('update-compliance-data', () => {
addressesScreened: 1,
upserted: 1,
statusUpserted: 1,
activeAddressesWithStaleCompliance: 0,
inactiveAddressesWithStaleCompliance: 0,
},
mockProvider.provider,
);
Expand Down Expand Up @@ -243,6 +253,8 @@ describe('update-compliance-data', () => {
addressesScreened: 1,
upserted: 1,
statusUpserted: 1,
activeAddressesWithStaleCompliance: 1,
inactiveAddressesWithStaleCompliance: 0,
},
mockProvider.provider,
);
Expand Down Expand Up @@ -279,6 +291,8 @@ describe('update-compliance-data', () => {
addressesScreened: 0,
upserted: 0,
statusUpserted: 0,
activeAddressesWithStaleCompliance: 1,
inactiveAddressesWithStaleCompliance: 0,
},
mockProvider.provider,
);
Expand Down Expand Up @@ -324,6 +338,8 @@ describe('update-compliance-data', () => {
addressesScreened: 1,
upserted: 1,
statusUpserted: 1,
activeAddressesWithStaleCompliance: 1,
inactiveAddressesWithStaleCompliance: 0,
},
mockProvider.provider,
);
Expand Down Expand Up @@ -375,6 +391,8 @@ describe('update-compliance-data', () => {
addressesScreened: 1,
upserted: 1,
statusUpserted: 1,
activeAddressesWithStaleCompliance: 0,
inactiveAddressesWithStaleCompliance: 1,
},
mockProvider.provider,
);
Expand Down Expand Up @@ -456,6 +474,8 @@ describe('update-compliance-data', () => {
addressesScreened: 2,
upserted: 2,
statusUpserted: 2,
activeAddressesWithStaleCompliance: 0,
inactiveAddressesWithStaleCompliance: 1,
},
mockProvider.provider,
);
Expand Down Expand Up @@ -546,6 +566,8 @@ describe('update-compliance-data', () => {
addressesScreened: 3,
upserted: 2,
statusUpserted: 2,
activeAddressesWithStaleCompliance: 0,
inactiveAddressesWithStaleCompliance: 1,
},
mockProvider.provider,
);
Expand Down Expand Up @@ -615,6 +637,9 @@ describe('update-compliance-data', () => {
addressesScreened: 1,
upserted: 1,
statusUpserted: 1,
// no old address is added for updating, but there is an old address with stale compliance
activeAddressesWithStaleCompliance: 0,
inactiveAddressesWithStaleCompliance: 1,
},
mockProvider.provider,
);
Expand Down Expand Up @@ -644,6 +669,8 @@ describe('update-compliance-data', () => {
addressesScreened: 1,
upserted: 1,
statusUpserted: 1,
activeAddressesWithStaleCompliance: 0,
inactiveAddressesWithStaleCompliance: 1,
},
mockProvider.provider,
);
Expand Down Expand Up @@ -739,44 +766,60 @@ function expectGaugeStats(
addressesScreened,
upserted,
statusUpserted,
activeAddressesWithStaleCompliance,
inactiveAddressesWithStaleCompliance,
}: {
activeAddresses: number,
newAddresses: number,
oldAddresses: number,
addressesScreened: number,
upserted: number,
statusUpserted: number,
activeAddressesWithStaleCompliance: number,
inactiveAddressesWithStaleCompliance: number,
},
provider: string,
): void {
expect(stats.gauge).toHaveBeenCalledWith(
'roundtable.update_compliance_data.num_active_addresses',
activeAddresses,
undefined,
STATS_NO_SAMPLING,
{ provider },
);
expect(stats.gauge).toHaveBeenCalledWith(
'roundtable.update_compliance_data.num_new_addresses',
newAddresses,
undefined,
STATS_NO_SAMPLING,
{ provider },
);
expect(stats.gauge).toHaveBeenCalledWith(
'roundtable.update_compliance_data.num_active_addresses_with_stale_compliance',
activeAddressesWithStaleCompliance,
STATS_NO_SAMPLING,
{ provider },
);
expect(stats.gauge).toHaveBeenCalledWith(
'roundtable.update_compliance_data.num_old_addresses',
oldAddresses,
undefined,
STATS_NO_SAMPLING,
{ provider },
);
expect(stats.gauge).toHaveBeenCalledWith(
'roundtable.update_compliance_data.num_inactive_addresses_with_stale_compliance',
inactiveAddressesWithStaleCompliance,
STATS_NO_SAMPLING,
{ provider },
);
expect(stats.gauge).toHaveBeenCalledWith(
'roundtable.update_compliance_data.num_addresses_to_screen',
addressesScreened,
undefined,
STATS_NO_SAMPLING,
{ provider },
);
expect(stats.gauge).toHaveBeenCalledWith(
'roundtable.update_compliance_data.num_upserted',
upserted,
undefined,
STATS_NO_SAMPLING,
{ provider },
);
expect(stats.gauge).toHaveBeenCalledWith(
Expand All @@ -791,19 +834,19 @@ function expectTimingStats(
expect(stats.timing).toHaveBeenCalledWith(
'roundtable.update_compliance_data.get_active_addresses',
expect.any(Number),
undefined,
STATS_NO_SAMPLING,
{ provider },
);
expect(stats.timing).toHaveBeenCalledWith(
'roundtable.update_compliance_data.get_old_addresses',
expect.any(Number),
undefined,
STATS_NO_SAMPLING,
{ provider },
);
expect(stats.timing).toHaveBeenCalledWith(
'roundtable.update_compliance_data.query_compliance_data',
expect.any(Number),
undefined,
STATS_NO_SAMPLING,
{ provider },
);
}
3 changes: 2 additions & 1 deletion indexer/services/roundtable/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ export const configSchema = {
MAX_COMPLIANCE_DATA_AGE_SECONDS: parseInteger({ default: 2_630_000 }), // 1 month
MAX_ACTIVE_COMPLIANCE_DATA_AGE_SECONDS: parseInteger({ default: 86_400 }), // 1 day
MAX_COMPLIANCE_DATA_QUERY_PER_LOOP: parseInteger({ default: 100 }),
COMPLIANCE_PROVIDER_QUERY_BATCH_SIZE: parseInteger({ default: 100 }),
// v2/wallet/synchronous rate limit is 15/s https://developers.elliptic.co/docs/configuration
COMPLIANCE_PROVIDER_QUERY_BATCH_SIZE: parseInteger({ default: 15 }),
COMPLIANCE_PROVIDER_QUERY_DELAY_MS: parseInteger({ default: ONE_SECOND_IN_MILLISECONDS }),
CLOSE_ONLY_TO_BLOCKED_DAYS: parseInteger({ default: 7 }),

Expand Down
67 changes: 44 additions & 23 deletions indexer/services/roundtable/src/tasks/update-compliance-data.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { delay, logger, stats } from '@dydxprotocol-indexer/base';
import {
STATS_NO_SAMPLING, delay, logger, stats,
} from '@dydxprotocol-indexer/base';
import { ComplianceClientResponse } from '@dydxprotocol-indexer/compliance';
import {
ComplianceDataColumns,
Expand Down Expand Up @@ -96,13 +98,10 @@ export default async function runTask(
}

// Add any address that has compliance data that's over the age threshold for active addresses
// and is not blocked
// and is not blocked. Count all such accounts.
let activeAddressesToQuery: number = 0;
let activeAddressesWithStaleCompliance: number = 0;
for (const addressCompliance of activeAddressCompliance) {
if (remainingQueries <= 0) {
break;
}

if (addressCompliance.blocked) {
continue;
}
Expand All @@ -111,27 +110,37 @@ export default async function runTask(
continue;
}

addressesToQuery.push(addressCompliance.address);
remainingQueries -= 1;
activeAddressesToQuery += 1;
activeAddressesWithStaleCompliance += 1;

if (remainingQueries > 0) {
addressesToQuery.push(addressCompliance.address);
remainingQueries -= 1;
activeAddressesToQuery += 1;
}
}

stats.timing(
`${config.SERVICE_NAME}.${taskName}.get_active_addresses`,
Date.now() - startActiveAddresses,
undefined,
STATS_NO_SAMPLING,
{ provider: complianceProvider.provider },
);
stats.gauge(
`${config.SERVICE_NAME}.${taskName}.num_active_addresses`,
activeAddressesToQuery,
undefined,
STATS_NO_SAMPLING,
{ provider: complianceProvider.provider },
);
stats.gauge(
`${config.SERVICE_NAME}.${taskName}.num_new_addresses`,
addressesWithoutCompliance.length,
undefined,
STATS_NO_SAMPLING,
{ provider: complianceProvider.provider },
);
stats.gauge(
`${config.SERVICE_NAME}.${taskName}.num_active_addresses_with_stale_compliance`,
activeAddressesWithStaleCompliance,
STATS_NO_SAMPLING,
{ provider: complianceProvider.provider },
);

Expand All @@ -142,27 +151,39 @@ export default async function runTask(
blocked: false,
provider: complianceProvider.provider,
updatedBeforeOrAt: ageThreshold,
limit: remainingQueries,
},
[],
{ readReplica: true },
);
addressesToQuery.push(...(
_.chain(oldAddressCompliance).map(ComplianceDataColumns.address).uniq().value()
));

const inactiveAddressesWithStaleCompliance = oldAddressCompliance.length;
const oldAddressesToAdd = _.chain(oldAddressCompliance)
.map(ComplianceDataColumns.address)
.uniq()
.take(remainingQueries)
.value();

addressesToQuery.push(...oldAddressesToAdd);

// Ensure all addresses to query are unique
addressesToQuery = _.sortedUniq(addressesToQuery);

stats.timing(
`${config.SERVICE_NAME}.${taskName}.get_old_addresses`,
Date.now() - startOldAddresses,
undefined,
STATS_NO_SAMPLING,
{ provider: complianceProvider.provider },
);
stats.gauge(
`${config.SERVICE_NAME}.${taskName}.num_old_addresses`,
oldAddressCompliance.length,
undefined,
oldAddressesToAdd.length,
STATS_NO_SAMPLING,
{ provider: complianceProvider.provider },
);
stats.gauge(
`${config.SERVICE_NAME}.${taskName}.num_inactive_addresses_with_stale_compliance`,
inactiveAddressesWithStaleCompliance,
STATS_NO_SAMPLING,
{ provider: complianceProvider.provider },
);

Expand Down Expand Up @@ -216,13 +237,13 @@ export default async function runTask(
stats.timing(
`${config.SERVICE_NAME}.${taskName}.query_compliance_data`,
Date.now() - startQueryProvider,
undefined,
STATS_NO_SAMPLING,
{ provider: complianceProvider.provider },
);
stats.gauge(
`${config.SERVICE_NAME}.${taskName}.num_addresses_to_screen`,
addressesToQuery.length,
undefined,
STATS_NO_SAMPLING,
{ provider: complianceProvider.provider },
);

Expand All @@ -235,13 +256,13 @@ export default async function runTask(
stats.timing(
`${config.SERVICE_NAME}.${taskName}.upsert_compliance_data`,
Date.now() - startUpsert,
undefined,
STATS_NO_SAMPLING,
{ provider: complianceProvider.provider },
);
stats.gauge(
`${config.SERVICE_NAME}.${taskName}.num_upserted`,
complianceCreateObjects.length,
undefined,
STATS_NO_SAMPLING,
{ provider: complianceProvider.provider },
);

Expand Down

0 comments on commit 372f645

Please sign in to comment.