From 1a7d4b7908851be573440ef092dbd73b09c0550a Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 28 Aug 2024 17:08:17 -0400 Subject: [PATCH] Update compliance roundtable batch size and add stale compliance metrics (backport #2131) (#2164) Co-authored-by: jerryfan01234 <44346807+jerryfan01234@users.noreply.github.com> --- .../tasks/update-compliance-data.test.ts | 61 ++++++++++++++--- indexer/services/roundtable/src/config.ts | 3 +- .../src/tasks/update-compliance-data.ts | 67 ++++++++++++------- 3 files changed, 98 insertions(+), 33 deletions(-) diff --git a/indexer/services/roundtable/__tests__/tasks/update-compliance-data.test.ts b/indexer/services/roundtable/__tests__/tasks/update-compliance-data.test.ts index 1c151645ba..7f394941ac 100644 --- a/indexer/services/roundtable/__tests__/tasks/update-compliance-data.test.ts +++ b/indexer/services/roundtable/__tests__/tasks/update-compliance-data.test.ts @@ -16,7 +16,7 @@ import { ComplianceReason, } from '@dydxprotocol-indexer/postgres'; import updateComplianceDataTask from '../../src/tasks/update-compliance-data'; -import { logger, stats } from '@dydxprotocol-indexer/base'; +import { STATS_NO_SAMPLING, logger, stats } from '@dydxprotocol-indexer/base'; import _ from 'lodash'; import config from '../../src/config'; import { ClientAndProvider } from '../../src/helpers/compliance-clients'; @@ -73,6 +73,8 @@ describe('update-compliance-data', () => { addressesScreened: 0, upserted: 0, statusUpserted: 0, + activeAddressesWithStaleCompliance: 0, + inactiveAddressesWithStaleCompliance: 0, }, mockProvider.provider, ); @@ -95,6 +97,8 @@ describe('update-compliance-data', () => { addressesScreened: 0, upserted: 0, statusUpserted: 0, + activeAddressesWithStaleCompliance: 0, + inactiveAddressesWithStaleCompliance: 0, }, mockProvider.provider, ); @@ -122,6 +126,8 @@ describe('update-compliance-data', () => { addressesScreened: 0, upserted: 0, statusUpserted: 0, + activeAddressesWithStaleCompliance: 0, + inactiveAddressesWithStaleCompliance: 0, }, mockProvider.provider, ); @@ -152,6 +158,8 @@ describe('update-compliance-data', () => { addressesScreened: 0, upserted: 0, statusUpserted: 0, + activeAddressesWithStaleCompliance: 0, + inactiveAddressesWithStaleCompliance: 0, }, mockProvider.provider, ); @@ -197,6 +205,8 @@ describe('update-compliance-data', () => { addressesScreened: 1, upserted: 1, statusUpserted: 1, + activeAddressesWithStaleCompliance: 0, + inactiveAddressesWithStaleCompliance: 0, }, mockProvider.provider, ); @@ -243,6 +253,8 @@ describe('update-compliance-data', () => { addressesScreened: 1, upserted: 1, statusUpserted: 1, + activeAddressesWithStaleCompliance: 1, + inactiveAddressesWithStaleCompliance: 0, }, mockProvider.provider, ); @@ -279,6 +291,8 @@ describe('update-compliance-data', () => { addressesScreened: 0, upserted: 0, statusUpserted: 0, + activeAddressesWithStaleCompliance: 1, + inactiveAddressesWithStaleCompliance: 0, }, mockProvider.provider, ); @@ -324,6 +338,8 @@ describe('update-compliance-data', () => { addressesScreened: 1, upserted: 1, statusUpserted: 1, + activeAddressesWithStaleCompliance: 1, + inactiveAddressesWithStaleCompliance: 0, }, mockProvider.provider, ); @@ -375,6 +391,8 @@ describe('update-compliance-data', () => { addressesScreened: 1, upserted: 1, statusUpserted: 1, + activeAddressesWithStaleCompliance: 0, + inactiveAddressesWithStaleCompliance: 1, }, mockProvider.provider, ); @@ -456,6 +474,8 @@ describe('update-compliance-data', () => { addressesScreened: 2, upserted: 2, statusUpserted: 2, + activeAddressesWithStaleCompliance: 0, + inactiveAddressesWithStaleCompliance: 1, }, mockProvider.provider, ); @@ -546,6 +566,8 @@ describe('update-compliance-data', () => { addressesScreened: 3, upserted: 2, statusUpserted: 2, + activeAddressesWithStaleCompliance: 0, + inactiveAddressesWithStaleCompliance: 1, }, mockProvider.provider, ); @@ -615,6 +637,9 @@ describe('update-compliance-data', () => { addressesScreened: 1, upserted: 1, statusUpserted: 1, + // no old address is added for updating, but there is an old address with stale compliance + activeAddressesWithStaleCompliance: 0, + inactiveAddressesWithStaleCompliance: 1, }, mockProvider.provider, ); @@ -644,6 +669,8 @@ describe('update-compliance-data', () => { addressesScreened: 1, upserted: 1, statusUpserted: 1, + activeAddressesWithStaleCompliance: 0, + inactiveAddressesWithStaleCompliance: 1, }, mockProvider.provider, ); @@ -739,6 +766,8 @@ function expectGaugeStats( addressesScreened, upserted, statusUpserted, + activeAddressesWithStaleCompliance, + inactiveAddressesWithStaleCompliance, }: { activeAddresses: number, newAddresses: number, @@ -746,37 +775,51 @@ function expectGaugeStats( addressesScreened: number, upserted: number, statusUpserted: number, + activeAddressesWithStaleCompliance: number, + inactiveAddressesWithStaleCompliance: number, }, provider: string, ): void { expect(stats.gauge).toHaveBeenCalledWith( 'roundtable.update_compliance_data.num_active_addresses', activeAddresses, - undefined, + STATS_NO_SAMPLING, { provider }, ); expect(stats.gauge).toHaveBeenCalledWith( 'roundtable.update_compliance_data.num_new_addresses', newAddresses, - undefined, + STATS_NO_SAMPLING, + { provider }, + ); + expect(stats.gauge).toHaveBeenCalledWith( + 'roundtable.update_compliance_data.num_active_addresses_with_stale_compliance', + activeAddressesWithStaleCompliance, + STATS_NO_SAMPLING, { provider }, ); expect(stats.gauge).toHaveBeenCalledWith( 'roundtable.update_compliance_data.num_old_addresses', oldAddresses, - undefined, + STATS_NO_SAMPLING, + { provider }, + ); + expect(stats.gauge).toHaveBeenCalledWith( + 'roundtable.update_compliance_data.num_inactive_addresses_with_stale_compliance', + inactiveAddressesWithStaleCompliance, + STATS_NO_SAMPLING, { provider }, ); expect(stats.gauge).toHaveBeenCalledWith( 'roundtable.update_compliance_data.num_addresses_to_screen', addressesScreened, - undefined, + STATS_NO_SAMPLING, { provider }, ); expect(stats.gauge).toHaveBeenCalledWith( 'roundtable.update_compliance_data.num_upserted', upserted, - undefined, + STATS_NO_SAMPLING, { provider }, ); expect(stats.gauge).toHaveBeenCalledWith( @@ -791,19 +834,19 @@ function expectTimingStats( expect(stats.timing).toHaveBeenCalledWith( 'roundtable.update_compliance_data.get_active_addresses', expect.any(Number), - undefined, + STATS_NO_SAMPLING, { provider }, ); expect(stats.timing).toHaveBeenCalledWith( 'roundtable.update_compliance_data.get_old_addresses', expect.any(Number), - undefined, + STATS_NO_SAMPLING, { provider }, ); expect(stats.timing).toHaveBeenCalledWith( 'roundtable.update_compliance_data.query_compliance_data', expect.any(Number), - undefined, + STATS_NO_SAMPLING, { provider }, ); } diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index 6dc659571d..6bf6334caf 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -156,7 +156,8 @@ export const configSchema = { MAX_COMPLIANCE_DATA_AGE_SECONDS: parseInteger({ default: 2_630_000 }), // 1 month MAX_ACTIVE_COMPLIANCE_DATA_AGE_SECONDS: parseInteger({ default: 86_400 }), // 1 day MAX_COMPLIANCE_DATA_QUERY_PER_LOOP: parseInteger({ default: 100 }), - COMPLIANCE_PROVIDER_QUERY_BATCH_SIZE: parseInteger({ default: 100 }), + // v2/wallet/synchronous rate limit is 15/s https://developers.elliptic.co/docs/configuration + COMPLIANCE_PROVIDER_QUERY_BATCH_SIZE: parseInteger({ default: 15 }), COMPLIANCE_PROVIDER_QUERY_DELAY_MS: parseInteger({ default: ONE_SECOND_IN_MILLISECONDS }), CLOSE_ONLY_TO_BLOCKED_DAYS: parseInteger({ default: 7 }), diff --git a/indexer/services/roundtable/src/tasks/update-compliance-data.ts b/indexer/services/roundtable/src/tasks/update-compliance-data.ts index 5ce54b8935..ba6be2e32b 100644 --- a/indexer/services/roundtable/src/tasks/update-compliance-data.ts +++ b/indexer/services/roundtable/src/tasks/update-compliance-data.ts @@ -1,4 +1,6 @@ -import { delay, logger, stats } from '@dydxprotocol-indexer/base'; +import { + STATS_NO_SAMPLING, delay, logger, stats, +} from '@dydxprotocol-indexer/base'; import { ComplianceClientResponse } from '@dydxprotocol-indexer/compliance'; import { ComplianceDataColumns, @@ -96,13 +98,10 @@ export default async function runTask( } // Add any address that has compliance data that's over the age threshold for active addresses - // and is not blocked + // and is not blocked. Count all such accounts. let activeAddressesToQuery: number = 0; + let activeAddressesWithStaleCompliance: number = 0; for (const addressCompliance of activeAddressCompliance) { - if (remainingQueries <= 0) { - break; - } - if (addressCompliance.blocked) { continue; } @@ -111,27 +110,37 @@ export default async function runTask( continue; } - addressesToQuery.push(addressCompliance.address); - remainingQueries -= 1; - activeAddressesToQuery += 1; + activeAddressesWithStaleCompliance += 1; + + if (remainingQueries > 0) { + addressesToQuery.push(addressCompliance.address); + remainingQueries -= 1; + activeAddressesToQuery += 1; + } } stats.timing( `${config.SERVICE_NAME}.${taskName}.get_active_addresses`, Date.now() - startActiveAddresses, - undefined, + STATS_NO_SAMPLING, { provider: complianceProvider.provider }, ); stats.gauge( `${config.SERVICE_NAME}.${taskName}.num_active_addresses`, activeAddressesToQuery, - undefined, + STATS_NO_SAMPLING, { provider: complianceProvider.provider }, ); stats.gauge( `${config.SERVICE_NAME}.${taskName}.num_new_addresses`, addressesWithoutCompliance.length, - undefined, + STATS_NO_SAMPLING, + { provider: complianceProvider.provider }, + ); + stats.gauge( + `${config.SERVICE_NAME}.${taskName}.num_active_addresses_with_stale_compliance`, + activeAddressesWithStaleCompliance, + STATS_NO_SAMPLING, { provider: complianceProvider.provider }, ); @@ -142,27 +151,39 @@ export default async function runTask( blocked: false, provider: complianceProvider.provider, updatedBeforeOrAt: ageThreshold, - limit: remainingQueries, }, [], { readReplica: true }, ); - addressesToQuery.push(...( - _.chain(oldAddressCompliance).map(ComplianceDataColumns.address).uniq().value() - )); + + const inactiveAddressesWithStaleCompliance = oldAddressCompliance.length; + const oldAddressesToAdd = _.chain(oldAddressCompliance) + .map(ComplianceDataColumns.address) + .uniq() + .take(remainingQueries) + .value(); + + addressesToQuery.push(...oldAddressesToAdd); + // Ensure all addresses to query are unique addressesToQuery = _.sortedUniq(addressesToQuery); stats.timing( `${config.SERVICE_NAME}.${taskName}.get_old_addresses`, Date.now() - startOldAddresses, - undefined, + STATS_NO_SAMPLING, { provider: complianceProvider.provider }, ); stats.gauge( `${config.SERVICE_NAME}.${taskName}.num_old_addresses`, - oldAddressCompliance.length, - undefined, + oldAddressesToAdd.length, + STATS_NO_SAMPLING, + { provider: complianceProvider.provider }, + ); + stats.gauge( + `${config.SERVICE_NAME}.${taskName}.num_inactive_addresses_with_stale_compliance`, + inactiveAddressesWithStaleCompliance, + STATS_NO_SAMPLING, { provider: complianceProvider.provider }, ); @@ -216,13 +237,13 @@ export default async function runTask( stats.timing( `${config.SERVICE_NAME}.${taskName}.query_compliance_data`, Date.now() - startQueryProvider, - undefined, + STATS_NO_SAMPLING, { provider: complianceProvider.provider }, ); stats.gauge( `${config.SERVICE_NAME}.${taskName}.num_addresses_to_screen`, addressesToQuery.length, - undefined, + STATS_NO_SAMPLING, { provider: complianceProvider.provider }, ); @@ -235,13 +256,13 @@ export default async function runTask( stats.timing( `${config.SERVICE_NAME}.${taskName}.upsert_compliance_data`, Date.now() - startUpsert, - undefined, + STATS_NO_SAMPLING, { provider: complianceProvider.provider }, ); stats.gauge( `${config.SERVICE_NAME}.${taskName}.num_upserted`, complianceCreateObjects.length, - undefined, + STATS_NO_SAMPLING, { provider: complianceProvider.provider }, );