From 5a02618c9b4a65279509febef3e24fcc5e17634d Mon Sep 17 00:00:00 2001 From: jerryfan01234 <44346807+jerryfan01234@users.noreply.github.com> Date: Fri, 20 Sep 2024 11:19:23 -0400 Subject: [PATCH] [OTE-757] add update affiliate info roundtable (#2233) --- .../postgres/__tests__/helpers/constants.ts | 2 +- .../stores/affiliate-info-table.test.ts | 257 +++++++++++++++- .../__tests__/stores/wallet-table.test.ts | 104 ++++--- ...410_change_fills_affiliaterevshare_type.ts | 16 + ...157_change_affiliate_info_default_value.ts | 17 ++ .../src/stores/affiliate-info-table.ts | 185 +++++++++++- .../postgres/src/stores/wallet-table.ts | 7 +- .../src/types/persistent-cache-types.ts | 5 + .../tasks/update-affiliate-info.test.ts | 277 ++++++++++++++++++ .../tasks/update-wallet-total-volume.test.ts | 42 +-- indexer/services/roundtable/src/config.ts | 4 + indexer/services/roundtable/src/index.ts | 9 +- .../src/tasks/update-affiliate-info.ts | 64 ++++ .../src/tasks/update-wallet-total-volume.ts | 31 +- 14 files changed, 919 insertions(+), 101 deletions(-) create mode 100644 indexer/packages/postgres/src/db/migrations/migration_files/20240910101410_change_fills_affiliaterevshare_type.ts create mode 100644 indexer/packages/postgres/src/db/migrations/migration_files/20240919142157_change_affiliate_info_default_value.ts create mode 100644 indexer/services/roundtable/__tests__/tasks/update-affiliate-info.test.ts create mode 100644 indexer/services/roundtable/src/tasks/update-affiliate-info.ts diff --git a/indexer/packages/postgres/__tests__/helpers/constants.ts b/indexer/packages/postgres/__tests__/helpers/constants.ts index 5d314e413a..56e2865227 100644 --- a/indexer/packages/postgres/__tests__/helpers/constants.ts +++ b/indexer/packages/postgres/__tests__/helpers/constants.ts @@ -565,7 +565,7 @@ export const defaultFill: FillCreateObject = { createdAtHeight: createdHeight, clientMetadata: '0', fee: '1.1', - affiliateRevShare: '1.1', + affiliateRevShare: '1.10', }; export const isolatedMarketFill: FillCreateObject = { diff --git a/indexer/packages/postgres/__tests__/stores/affiliate-info-table.test.ts b/indexer/packages/postgres/__tests__/stores/affiliate-info-table.test.ts index 206c634652..04790001c8 100644 --- a/indexer/packages/postgres/__tests__/stores/affiliate-info-table.test.ts +++ b/indexer/packages/postgres/__tests__/stores/affiliate-info-table.test.ts @@ -1,7 +1,26 @@ -import { AffiliateInfoFromDatabase } from '../../src/types'; +import { + AffiliateInfoFromDatabase, Liquidity, +} from '../../src/types'; import { clearData, migrate, teardown } from '../../src/helpers/db-helpers'; -import { defaultAffiliateInfo, defaultAffiliateInfo2 } from '../helpers/constants'; +import { + defaultOrder, + defaultWallet, + defaultFill, + defaultWallet2, + defaultAffiliateInfo, + defaultAffiliateInfo2, + defaultTendermintEventId, + defaultTendermintEventId2, + defaultTendermintEventId3, + defaultTendermintEventId4, + vaultAddress, +} from '../helpers/constants'; import * as AffiliateInfoTable from '../../src/stores/affiliate-info-table'; +import * as OrderTable from '../../src/stores/order-table'; +import * as AffiliateReferredUsersTable from '../../src/stores/affiliate-referred-users-table'; +import * as FillTable from '../../src/stores/fill-table'; +import { seedData } from '../helpers/mock-generators'; +import { DateTime } from 'luxon'; describe('Affiliate info store', () => { beforeAll(async () => { @@ -27,7 +46,7 @@ describe('Affiliate info store', () => { it('Can upsert affiliate info multiple times', async () => { await AffiliateInfoTable.upsert(defaultAffiliateInfo); - let info: AffiliateInfoFromDatabase = await AffiliateInfoTable.findById( + let info: AffiliateInfoFromDatabase | undefined = await AffiliateInfoTable.findById( defaultAffiliateInfo.address, ); expect(info).toEqual(expect.objectContaining(defaultAffiliateInfo)); @@ -56,16 +75,177 @@ describe('Affiliate info store', () => { ])); }); - it('Successfully finds an affiliate info', async () => { + it('Successfully finds affiliate info by Id', async () => { await AffiliateInfoTable.create(defaultAffiliateInfo); - const info: AffiliateInfoFromDatabase = await AffiliateInfoTable.findById( + const info: AffiliateInfoFromDatabase | undefined = await AffiliateInfoTable.findById( defaultAffiliateInfo.address, ); - expect(info).toEqual(expect.objectContaining(defaultAffiliateInfo)); }); + it('Returns undefined if affiliate info not found by Id', async () => { + await AffiliateInfoTable.create(defaultAffiliateInfo); + + const info: AffiliateInfoFromDatabase | undefined = await AffiliateInfoTable.findById( + 'non_existent_address', + ); + expect(info).toBeUndefined(); + }); + + describe('updateInfo', () => { + it('Successfully creates new affiliate info', async () => { + const referenceDt: DateTime = await populateFillsAndReferrals(); + + // Perform update + await AffiliateInfoTable.updateInfo( + referenceDt.minus({ minutes: 2 }).toISO(), + referenceDt.toISO(), + ); + + // Get affiliate info (wallet2 is affiliate) + const updatedInfo: AffiliateInfoFromDatabase | undefined = await AffiliateInfoTable.findById( + defaultWallet2.address, + ); + + const expectedAffiliateInfo: AffiliateInfoFromDatabase = { + address: defaultWallet2.address, + affiliateEarnings: '1000', + referredMakerTrades: 1, + referredTakerTrades: 1, + totalReferredFees: '2000', + totalReferredUsers: 1, + referredNetProtocolEarnings: '1000', + firstReferralBlockHeight: '1', + referredTotalVolume: '2', + }; + + expect(updatedInfo).toEqual(expectedAffiliateInfo); + }); + + it('Successfully updates/increments affiliate info for stats and new referrals', async () => { + const referenceDt: DateTime = await populateFillsAndReferrals(); + + // Perform update: catches first 2 fills + await AffiliateInfoTable.updateInfo( + referenceDt.minus({ minutes: 3 }).toISO(), + referenceDt.minus({ minutes: 2 }).toISO(), + ); + + const updatedInfo1: AffiliateInfoFromDatabase | undefined = await AffiliateInfoTable.findById( + defaultWallet2.address, + ); + const expectedAffiliateInfo1: AffiliateInfoFromDatabase = { + address: defaultWallet2.address, + affiliateEarnings: '1000', + referredMakerTrades: 2, + referredTakerTrades: 0, + totalReferredFees: '2000', + totalReferredUsers: 1, + referredNetProtocolEarnings: '1000', + firstReferralBlockHeight: '1', + referredTotalVolume: '2', + }; + expect(updatedInfo1).toEqual(expectedAffiliateInfo1); + + // Perform update: catches next 2 fills + await AffiliateInfoTable.updateInfo( + referenceDt.minus({ minutes: 2 }).toISO(), + referenceDt.minus({ minutes: 1 }).toISO(), + ); + + const updatedInfo2 = await AffiliateInfoTable.findById( + defaultWallet2.address, + ); + const expectedAffiliateInfo2 = { + address: defaultWallet2.address, + affiliateEarnings: '2000', + referredMakerTrades: 3, + referredTakerTrades: 1, + totalReferredFees: '4000', + totalReferredUsers: 1, + referredNetProtocolEarnings: '2000', + firstReferralBlockHeight: '1', + referredTotalVolume: '4', + }; + expect(updatedInfo2).toEqual(expectedAffiliateInfo2); + + // Perform update: catches no fills but new affiliate referral + await AffiliateReferredUsersTable.create({ + affiliateAddress: defaultWallet2.address, + refereeAddress: vaultAddress, + referredAtBlock: '2', + }); + await AffiliateInfoTable.updateInfo( + referenceDt.minus({ minutes: 1 }).toISO(), + referenceDt.toISO(), + ); + const updatedInfo3 = await AffiliateInfoTable.findById( + defaultWallet2.address, + ); + const expectedAffiliateInfo3 = { + address: defaultWallet2.address, + affiliateEarnings: '2000', + referredMakerTrades: 3, + referredTakerTrades: 1, + totalReferredFees: '4000', + totalReferredUsers: 2, + referredNetProtocolEarnings: '2000', + firstReferralBlockHeight: '1', + referredTotalVolume: '4', + }; + expect(updatedInfo3).toEqual(expectedAffiliateInfo3); + }); + + it('Does not use fills from before referal block height', async () => { + const referenceDt: DateTime = DateTime.utc(); + + await seedData(); + await OrderTable.create(defaultOrder); + + // Referal at block 2 but fill is at block 1 + await AffiliateReferredUsersTable.create({ + affiliateAddress: defaultWallet2.address, + refereeAddress: defaultWallet.address, + referredAtBlock: '2', + }); + await FillTable.create({ + ...defaultFill, + liquidity: Liquidity.TAKER, + subaccountId: defaultOrder.subaccountId, + createdAt: referenceDt.toISO(), + createdAtHeight: '1', + eventId: defaultTendermintEventId, + price: '1', + size: '1', + fee: '1000', + affiliateRevShare: '500', + }); + + await AffiliateInfoTable.updateInfo( + referenceDt.minus({ minutes: 1 }).toISO(), + referenceDt.toISO(), + ); + + const updatedInfo: AffiliateInfoFromDatabase | undefined = await AffiliateInfoTable.findById( + defaultWallet2.address, + ); + // expect one referred user but no fill stats + const expectedAffiliateInfo: AffiliateInfoFromDatabase = { + address: defaultWallet2.address, + affiliateEarnings: '0', + referredMakerTrades: 0, + referredTakerTrades: 0, + totalReferredFees: '0', + totalReferredUsers: 1, + referredNetProtocolEarnings: '0', + firstReferralBlockHeight: '2', + referredTotalVolume: '0', + }; + expect(updatedInfo).toEqual(expectedAffiliateInfo); + }); + }); + describe('paginatedFindWithAddressFilter', () => { beforeEach(async () => { await migrate(); @@ -175,3 +355,68 @@ describe('Affiliate info store', () => { }); }); }); + +async function populateFillsAndReferrals(): Promise { + const referenceDt = DateTime.utc(); + + await seedData(); + + // defaultWallet2 will be affiliate and defaultWallet will be referee + await AffiliateReferredUsersTable.create({ + affiliateAddress: defaultWallet2.address, + refereeAddress: defaultWallet.address, + referredAtBlock: '1', + }); + + // Create order and fils for defaultWallet (referee) + await OrderTable.create(defaultOrder); + + await Promise.all([ + FillTable.create({ + ...defaultFill, + liquidity: Liquidity.TAKER, + subaccountId: defaultOrder.subaccountId, + createdAt: referenceDt.minus({ minutes: 1 }).toISO(), + eventId: defaultTendermintEventId, + price: '1', + size: '1', + fee: '1000', + affiliateRevShare: '500', + }), + FillTable.create({ + ...defaultFill, + liquidity: Liquidity.MAKER, + subaccountId: defaultOrder.subaccountId, + createdAt: referenceDt.minus({ minutes: 1 }).toISO(), + eventId: defaultTendermintEventId2, + price: '1', + size: '1', + fee: '1000', + affiliateRevShare: '500', + }), + FillTable.create({ + ...defaultFill, + liquidity: Liquidity.MAKER, // use uneven number of maker/taker + subaccountId: defaultOrder.subaccountId, + createdAt: referenceDt.minus({ minutes: 2 }).toISO(), + eventId: defaultTendermintEventId3, + price: '1', + size: '1', + fee: '1000', + affiliateRevShare: '500', + }), + FillTable.create({ + ...defaultFill, + liquidity: Liquidity.MAKER, + subaccountId: defaultOrder.subaccountId, + createdAt: referenceDt.minus({ minutes: 2 }).toISO(), + eventId: defaultTendermintEventId4, + price: '1', + size: '1', + fee: '1000', + affiliateRevShare: '500', + }), + ]); + + return referenceDt; +} diff --git a/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts b/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts index cf4c28ae85..a6baa18ae5 100644 --- a/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts +++ b/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts @@ -1,4 +1,4 @@ -import { WalletFromDatabase } from '../../src/types'; +import { WalletFromDatabase, PersistentCacheKeys, PersistentCacheFromDatabase } from '../../src/types'; import { clearData, migrate, teardown } from '../../src/helpers/db-helpers'; import { DateTime } from 'luxon'; import { @@ -90,48 +90,68 @@ describe('Wallet store', () => { expect(wallet).toEqual(expect.objectContaining(defaultWallet2)); }); - it('Successfully updates totalVolume for time window multiple times', async () => { - const firstFillTime = await populateWalletSubaccountFill(); - - // Update totalVolume for a time window that covers all fills - await WalletTable.updateTotalVolume( - firstFillTime.minus({ hours: 1 }).toISO(), // need to minus because left bound is exclusive - firstFillTime.plus({ hours: 1 }).toISO(), - ); - let wallet = await WalletTable.findById(defaultWallet.address); - expect(wallet).toEqual(expect.objectContaining({ - ...defaultWallet, - totalVolume: '103', - })); - - // Update totalVolume for a time window that excludes some fills - // For convenience, we will reuse the existing fills data. The total volume calculated in this - // window should be added to the total volume above. - await WalletTable.updateTotalVolume( - firstFillTime.toISO(), // exclusive -> filters out first fill from each subaccount - firstFillTime.plus({ minutes: 2 }).toISO(), - ); - wallet = await WalletTable.findById(defaultWallet.address); - expect(wallet).toEqual(expect.objectContaining({ - ...defaultWallet, - totalVolume: '105', // 103 + 2 - })); - }); - - it('Successfully updates totalVolumeUpdateTime in persistent cache', async () => { - const leftBound = DateTime.utc().minus({ hours: 1 }); - const rightBound = DateTime.utc(); - await WalletTable.updateTotalVolume(leftBound.toISO(), rightBound.toISO()); - - const persistentCache = await PersistentCacheTable.findById('totalVolumeUpdateTime'); - const lastUpdateTime = persistentCache?.value - ? DateTime.fromISO(persistentCache.value) - : undefined; + describe('updateTotalVolume', () => { + it('Successfully updates totalVolume for time window multiple times', async () => { + const firstFillTime: DateTime = await populateWalletSubaccountFill(); + + // Update totalVolume for a time window that covers all fills + await WalletTable.updateTotalVolume( + firstFillTime.minus({ hours: 1 }).toISO(), // need to minus because left bound is exclusive + firstFillTime.plus({ hours: 1 }).toISO(), + ); + let wallet: WalletFromDatabase | undefined = await WalletTable + .findById(defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...defaultWallet, + totalVolume: '103', + })); + + // Update totalVolume for a time window that excludes some fills + // For convenience, we will reuse the existing fills data. The total volume calculated in this + // window should be added to the total volume above. + await WalletTable.updateTotalVolume( + firstFillTime.toISO(), // exclusive -> filters out first fill from each subaccount + firstFillTime.plus({ minutes: 2 }).toISO(), + ); + wallet = await WalletTable.findById(defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...defaultWallet, + totalVolume: '105', // 103 + 2 + })); + }); - expect(lastUpdateTime).not.toBeUndefined(); - if (lastUpdateTime?.toMillis() !== undefined) { - expect(lastUpdateTime.toMillis()).toEqual(rightBound.toMillis()); - } + it('Successfully upserts persistent cache', async () => { + const referenceDt = DateTime.utc(); + + // Sets initial persistent cache value + let leftBound: DateTime = referenceDt.minus({ hours: 2 }); + let rightBound: DateTime = referenceDt.minus({ hours: 1 }); + + await WalletTable.updateTotalVolume(leftBound.toISO(), rightBound.toISO()); + + let persistentCache: PersistentCacheFromDatabase | undefined = await PersistentCacheTable + .findById(PersistentCacheKeys.TOTAL_VOLUME_UPDATE_TIME); + let lastUpdateTime: string | undefined = persistentCache?.value; + expect(lastUpdateTime).not.toBeUndefined(); + if (lastUpdateTime !== undefined) { + expect(lastUpdateTime).toEqual(rightBound.toISO()); + } + + // Updates persistent cache value + leftBound = referenceDt.minus({ hours: 1 }); + rightBound = referenceDt; + + await WalletTable.updateTotalVolume(leftBound.toISO(), rightBound.toISO()); + + persistentCache = await PersistentCacheTable.findById( + PersistentCacheKeys.TOTAL_VOLUME_UPDATE_TIME, + ); + lastUpdateTime = persistentCache?.value; + expect(lastUpdateTime).not.toBeUndefined(); + if (lastUpdateTime !== undefined) { + expect(lastUpdateTime).toEqual(rightBound.toISO()); + } + }); }); }); diff --git a/indexer/packages/postgres/src/db/migrations/migration_files/20240910101410_change_fills_affiliaterevshare_type.ts b/indexer/packages/postgres/src/db/migrations/migration_files/20240910101410_change_fills_affiliaterevshare_type.ts new file mode 100644 index 0000000000..dd61af5d3b --- /dev/null +++ b/indexer/packages/postgres/src/db/migrations/migration_files/20240910101410_change_fills_affiliaterevshare_type.ts @@ -0,0 +1,16 @@ +import * as Knex from 'knex'; + +// No data has been stored added at time of commit +export async function up(knex: Knex): Promise { + return knex.schema.alterTable('fills', (table) => { + // decimal('columnName') has is 8,2 precision and scale + // decimal('columnName', null) has variable precision and scale + table.decimal('affiliateRevShare', null).notNullable().defaultTo(0).alter(); + }); +} + +export async function down(knex: Knex): Promise { + return knex.schema.alterTable('fills', (table) => { + table.string('affiliateRevShare').notNullable().defaultTo('0').alter(); + }); +} diff --git a/indexer/packages/postgres/src/db/migrations/migration_files/20240919142157_change_affiliate_info_default_value.ts b/indexer/packages/postgres/src/db/migrations/migration_files/20240919142157_change_affiliate_info_default_value.ts new file mode 100644 index 0000000000..9b19769be7 --- /dev/null +++ b/indexer/packages/postgres/src/db/migrations/migration_files/20240919142157_change_affiliate_info_default_value.ts @@ -0,0 +1,17 @@ +import * as Knex from 'knex'; + +export async function up(knex: Knex): Promise { + return knex.schema.alterTable('affiliate_info', (table) => { + table.decimal('affiliateEarnings', null).notNullable().defaultTo(0).alter(); + table.decimal('totalReferredFees', null).notNullable().defaultTo(0).alter(); + table.decimal('referredNetProtocolEarnings', null).notNullable().defaultTo(0).alter(); + }); +} + +export async function down(knex: Knex): Promise { + return knex.schema.alterTable('affiliate_info', (table) => { + table.decimal('affiliateEarnings', null).alter(); + table.decimal('totalReferredFees', null).alter(); + table.decimal('referredNetProtocolEarnings', null).alter(); + }); +} diff --git a/indexer/packages/postgres/src/stores/affiliate-info-table.ts b/indexer/packages/postgres/src/stores/affiliate-info-table.ts index 8a27b8c895..b18568d1eb 100644 --- a/indexer/packages/postgres/src/stores/affiliate-info-table.ts +++ b/indexer/packages/postgres/src/stores/affiliate-info-table.ts @@ -1,6 +1,8 @@ +import Knex from 'knex'; import { QueryBuilder } from 'objection'; import { DEFAULT_POSTGRES_OPTIONS } from '../constants'; +import { knexPrimary } from '../helpers/knex'; import { setupBaseQuery, verifyAllRequiredFields } from '../helpers/stores-helpers'; import Transaction from '../helpers/transaction'; import AffiliateInfoModel from '../models/affiliate-info-model'; @@ -13,6 +15,7 @@ import { AffiliateInfoCreateObject, AffiliateInfoFromDatabase, AffiliateInfoQueryConfig, + Liquidity, } from '../types'; export async function findAll( @@ -84,7 +87,7 @@ export async function upsert( export async function findById( address: string, options: Options = DEFAULT_POSTGRES_OPTIONS, -): Promise { +): Promise { const baseQuery: QueryBuilder = setupBaseQuery( AffiliateInfoModel, options, @@ -94,6 +97,186 @@ export async function findById( .returning('*'); } +/** + * Updates affiliate information in the database based on the provided time window. + * + * This function aggregates affiliate-related metadata and fill statistics + * from various tables. Then it upserts the aggregated data into the `affiliate_info` table. + * + * @async + * @function updateInfo + * @param {string} windowStartTs - The exclusive start timestamp for filtering fills. + * @param {string} windowEndTs - The inclusive end timestamp for filtering fill. + * @param {Options} [options={ txId: undefined }] - Optional transaction ID or additional options. + * @returns {Promise} + */ +export async function updateInfo( + windowStartTs: string, // exclusive + windowEndTs: string, // inclusive + txId: number | undefined = undefined, +) : Promise { + const transaction: Knex.Transaction | undefined = Transaction.get(txId); + + const query = ` +-- Get metadata for all affiliates +-- STEP 1: Aggregate affiliate_referred_users +WITH affiliate_metadata AS ( + SELECT + "affiliateAddress", + COUNT(*) AS "totalReferredUsers", + MIN("referredAtBlock") AS "firstReferralBlockHeight" + FROM + affiliate_referred_users + GROUP BY + "affiliateAddress" +), + +-- Calculate fill related stats for affiliates +-- Step 2a: Inner join affiliate_referred_users with subaccounts to get subaccounts referred by the affiliate +affiliate_referred_subaccounts AS ( + SELECT + affiliate_referred_users."affiliateAddress", + affiliate_referred_users."referredAtBlock", + subaccounts."id" + FROM + affiliate_referred_users + INNER JOIN + subaccounts + ON + affiliate_referred_users."refereeAddress" = subaccounts."address" +), + +-- Step 2b: Filter fills by time window +filtered_fills AS ( + SELECT + fills."subaccountId", + fills."liquidity", + fills."createdAt", + CAST(fills."fee" AS decimal) AS "fee", + fills."affiliateRevShare", + fills."createdAtHeight", + fills."price", + fills."size" + FROM + fills + WHERE + fills."createdAt" > '${windowStartTs}' + AND fills."createdAt" <= '${windowEndTs}' +), + +-- Step 2c: Inner join filtered_fills with affiliate_referred_subaccounts and filter +affiliate_fills AS ( + SELECT + filtered_fills."subaccountId", + filtered_fills."liquidity", + filtered_fills."createdAt", + filtered_fills."fee", + filtered_fills."affiliateRevShare", + filtered_fills."price", + filtered_fills."size", + affiliate_referred_subaccounts."affiliateAddress", + affiliate_referred_subaccounts."referredAtBlock" + FROM + filtered_fills + INNER JOIN + affiliate_referred_subaccounts + ON + filtered_fills."subaccountId" = affiliate_referred_subaccounts."id" + WHERE + filtered_fills."createdAtHeight" >= affiliate_referred_subaccounts."referredAtBlock" +), + +-- Step 2d: Groupby to get affiliate level stats +affiliate_stats AS ( + SELECT + affiliate_fills."affiliateAddress", + SUM(affiliate_fills."fee") AS "totalReferredFees", + SUM(affiliate_fills."affiliateRevShare") AS "affiliateEarnings", + SUM(affiliate_fills."fee") - SUM(affiliate_fills."affiliateRevShare") AS "referredNetProtocolEarnings", + COUNT(CASE WHEN affiliate_fills."liquidity" = '${Liquidity.MAKER}' THEN 1 END) AS "referredMakerTrades", + COUNT(CASE WHEN affiliate_fills."liquidity" = '${Liquidity.TAKER}' THEN 1 END) AS "referredTakerTrades", + SUM(affiliate_fills."price" * affiliate_fills."size") AS "referredTotalVolume" + FROM + affiliate_fills + GROUP BY + affiliate_fills."affiliateAddress" +), + +-- Prepare to update affiliate_info +-- STEP 3a: Left join affiliate_stats onto affiliate_metadata. affiliate_stats only has values for +-- addresses with fills in the time window +affiliate_info_update AS ( + SELECT + affiliate_metadata."affiliateAddress", + affiliate_metadata."totalReferredUsers", + affiliate_metadata."firstReferralBlockHeight", + COALESCE(affiliate_stats."totalReferredFees", 0) AS "totalReferredFees", + COALESCE(affiliate_stats."affiliateEarnings", 0) AS "affiliateEarnings", + COALESCE(affiliate_stats."referredNetProtocolEarnings", 0) AS "referredNetProtocolEarnings", + COALESCE(affiliate_stats."referredMakerTrades", 0) AS "referredMakerTrades", + COALESCE(affiliate_stats."referredTakerTrades", 0) AS "referredTakerTrades", + COALESCE(affiliate_stats."referredTotalVolume", 0) AS "referredTotalVolume" + FROM + affiliate_metadata + LEFT JOIN + affiliate_stats + ON affiliate_metadata."affiliateAddress" = affiliate_stats."affiliateAddress" +) + +-- Step 3b: Update/upsert the affiliate info table with the new stats +INSERT INTO affiliate_info ( + "address", + "totalReferredUsers", + "firstReferralBlockHeight", + "affiliateEarnings", + "referredMakerTrades", + "referredTakerTrades", + "totalReferredFees", + "referredNetProtocolEarnings", + "referredTotalVolume" +) +SELECT + "affiliateAddress", + "totalReferredUsers", + "firstReferralBlockHeight", + "affiliateEarnings", + "referredMakerTrades", + "referredTakerTrades", + "totalReferredFees", + "referredNetProtocolEarnings", + "referredTotalVolume" +FROM + affiliate_info_update +ON CONFLICT ("address") +DO UPDATE SET + "totalReferredUsers" = EXCLUDED."totalReferredUsers", + "firstReferralBlockHeight" = EXCLUDED."firstReferralBlockHeight", + "affiliateEarnings" = affiliate_info."affiliateEarnings" + EXCLUDED."affiliateEarnings", + "referredMakerTrades" = affiliate_info."referredMakerTrades" + EXCLUDED."referredMakerTrades", + "referredTakerTrades" = affiliate_info."referredTakerTrades" + EXCLUDED."referredTakerTrades", + "totalReferredFees" = affiliate_info."totalReferredFees" + EXCLUDED."totalReferredFees", + "referredNetProtocolEarnings" = affiliate_info."referredNetProtocolEarnings" + EXCLUDED."referredNetProtocolEarnings", + "referredTotalVolume" = affiliate_info."referredTotalVolume" + EXCLUDED."referredTotalVolume"; + `; + + return transaction + ? knexPrimary.raw(query).transacting(transaction) + : knexPrimary.raw(query); +} + +/** + * Finds affiliate information from the database with optional address filtering, sorting, + * and offset based pagination. + * + * @async + * @function paginatedFindWithAddressFilter + * @param {string[]} addressFilter - An array of affiliate addresses to filter by. + * @param {number} offset - The offset for pagination. + * @param {number} limit - The maximum number of records to return. + * @param {boolean} sortByAffiliateEarning - Sort the results by affiliate earnings in desc order. + * @param {Options} [options=DEFAULT_POSTGRES_OPTIONS] - Optional config for database interaction. + * @returns {Promise} + */ export async function paginatedFindWithAddressFilter( addressFilter: string[], offset: number, diff --git a/indexer/packages/postgres/src/stores/wallet-table.ts b/indexer/packages/postgres/src/stores/wallet-table.ts index 8bd410ef1f..8b459a2840 100644 --- a/indexer/packages/postgres/src/stores/wallet-table.ts +++ b/indexer/packages/postgres/src/stores/wallet-table.ts @@ -1,7 +1,7 @@ import { PartialModelObject, QueryBuilder } from 'objection'; import { DEFAULT_POSTGRES_OPTIONS } from '../constants'; -import { knexReadReplica } from '../helpers/knex'; +import { knexPrimary } from '../helpers/knex'; import { setupBaseQuery, verifyAllRequiredFields } from '../helpers/stores-helpers'; import Transaction from '../helpers/transaction'; import WalletModel from '../models/wallet-model'; @@ -15,6 +15,7 @@ import { WalletFromDatabase, WalletQueryConfig, WalletUpdateObject, + PersistentCacheKeys, } from '../types'; export async function findAll( @@ -123,7 +124,7 @@ export async function updateTotalVolume( windowEndTs: string, ) : Promise { - await knexReadReplica.getConnection().raw( + await knexPrimary.raw( ` BEGIN; @@ -155,7 +156,7 @@ export async function updateTotalVolume( -- Step 5: Upsert new totalVolumeUpdateTime to persistent_cache table INSERT INTO persistent_cache (key, value) - VALUES ('totalVolumeUpdateTime', '${windowEndTs}') + VALUES ('${PersistentCacheKeys.TOTAL_VOLUME_UPDATE_TIME}', '${windowEndTs}') ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value; diff --git a/indexer/packages/postgres/src/types/persistent-cache-types.ts b/indexer/packages/postgres/src/types/persistent-cache-types.ts index 6022e6764d..0ae79ae7e1 100644 --- a/indexer/packages/postgres/src/types/persistent-cache-types.ts +++ b/indexer/packages/postgres/src/types/persistent-cache-types.ts @@ -7,3 +7,8 @@ export enum PersistentCacheColumns { key = 'key', value = 'value', } + +export enum PersistentCacheKeys { + TOTAL_VOLUME_UPDATE_TIME = 'totalVolumeUpdateTime', + AFFILIATE_INFO_UPDATE_TIME = 'affiliateInfoUpdateTime', +} diff --git a/indexer/services/roundtable/__tests__/tasks/update-affiliate-info.test.ts b/indexer/services/roundtable/__tests__/tasks/update-affiliate-info.test.ts new file mode 100644 index 0000000000..6f3884ab41 --- /dev/null +++ b/indexer/services/roundtable/__tests__/tasks/update-affiliate-info.test.ts @@ -0,0 +1,277 @@ +import { + dbHelpers, + testConstants, + testMocks, + PersistentCacheTable, + FillTable, + OrderTable, + PersistentCacheKeys, + AffiliateReferredUsersTable, + AffiliateInfoFromDatabase, + AffiliateInfoTable, + Liquidity, + PersistentCacheFromDatabase, + BlockTable, +} from '@dydxprotocol-indexer/postgres'; +import affiliateInfoUpdateTask from '../../src/tasks/update-affiliate-info'; +import { DateTime } from 'luxon'; + +describe('update-affiliate-info', () => { + beforeAll(async () => { + await dbHelpers.migrate(); + await dbHelpers.clearData(); + }); + + beforeEach(async () => { + await testMocks.seedData(); + await OrderTable.create(testConstants.defaultOrder); + }); + + afterAll(async () => { + await dbHelpers.teardown(); + jest.resetAllMocks(); + }); + + afterEach(async () => { + await dbHelpers.clearData(); + }); + + it('Successfully updates affiliate info and persistent cache multiple times', async () => { + // Set persistent cache affiliateInfoUpdateTime to slightly in past so task does not backfill + await PersistentCacheTable.create({ + key: PersistentCacheKeys.AFFILIATE_INFO_UPDATE_TIME, + value: DateTime.utc().toISO(), + }); + + const updatedDt1: DateTime = DateTime.utc(); + await Promise.all([ + // First task run: add referral w/o any fills + // defaultWallet2 will be affiliate and defaultWallet will be referee + AffiliateReferredUsersTable.create({ + affiliateAddress: testConstants.defaultWallet2.address, + refereeAddress: testConstants.defaultWallet.address, + referredAtBlock: '1', + }), + + // Create block to simulate time passing + BlockTable.create({ + blockHeight: '3', + time: updatedDt1.toISO(), + }), + ]); + + // Run task + await affiliateInfoUpdateTask(); + + const updatedInfo1: AffiliateInfoFromDatabase | undefined = await AffiliateInfoTable.findById( + testConstants.defaultWallet2.address, + ); + const expectedAffiliateInfo1: AffiliateInfoFromDatabase = { + address: testConstants.defaultWallet2.address, + affiliateEarnings: '0', + referredMakerTrades: 0, + referredTakerTrades: 0, + totalReferredFees: '0', + totalReferredUsers: 1, + referredNetProtocolEarnings: '0', + firstReferralBlockHeight: '1', + referredTotalVolume: '0', + }; + expect(updatedInfo1).toEqual(expectedAffiliateInfo1); + + // Check that persistent cache updated + const lastUpdateTime1: DateTime | undefined = await getAffiliateInfoUpdateTime(); + if (lastUpdateTime1 !== undefined) { + expect(lastUpdateTime1.toMillis()).toEqual(updatedDt1.toMillis()); + } + + // Second task run: one new fill and one new referral + await Promise.all([ + FillTable.create({ + ...testConstants.defaultFill, + liquidity: Liquidity.TAKER, + createdAt: DateTime.utc().toISO(), + eventId: testConstants.defaultTendermintEventId, + price: '1', + size: '1', + fee: '1000', + affiliateRevShare: '500', + }), + AffiliateReferredUsersTable.create({ + affiliateAddress: testConstants.defaultWallet2.address, + refereeAddress: testConstants.defaultWallet3.address, + referredAtBlock: '2', + }), + ]); + + const updatedDt2: DateTime = DateTime.utc(); + await BlockTable.create({ + blockHeight: '4', + time: updatedDt2.toISO(), + }); + + await affiliateInfoUpdateTask(); + + const updatedInfo2: AffiliateInfoFromDatabase | undefined = await AffiliateInfoTable.findById( + testConstants.defaultWallet2.address, + ); + const expectedAffiliateInfo2: AffiliateInfoFromDatabase = { + address: testConstants.defaultWallet2.address, + affiliateEarnings: '500', + referredMakerTrades: 0, + referredTakerTrades: 1, + totalReferredFees: '1000', + totalReferredUsers: 2, + referredNetProtocolEarnings: '500', + firstReferralBlockHeight: '1', + referredTotalVolume: '1', + }; + expect(updatedInfo2).toEqual(expectedAffiliateInfo2); + const lastUpdateTime2: DateTime | undefined = await getAffiliateInfoUpdateTime(); + if (lastUpdateTime2 !== undefined) { + expect(lastUpdateTime2.toMillis()).toEqual(updatedDt2.toMillis()); + } + }); + + it('Successfully backfills from past date', async () => { + const currentDt: DateTime = DateTime.utc(); + + await Promise.all([ + // Set persistent cache to 3 weeks ago to emulate backfill from 3 weeks. + PersistentCacheTable.create({ + key: PersistentCacheKeys.AFFILIATE_INFO_UPDATE_TIME, + value: currentDt.minus({ weeks: 3 }).toISO(), + }), + // defaultWallet2 will be affiliate and defaultWallet will be referee + AffiliateReferredUsersTable.create({ + affiliateAddress: testConstants.defaultWallet2.address, + refereeAddress: testConstants.defaultWallet.address, + referredAtBlock: '1', + }), + // Fills spannings 2 weeks + FillTable.create({ + ...testConstants.defaultFill, + liquidity: Liquidity.TAKER, + createdAt: currentDt.minus({ weeks: 1 }).toISO(), + eventId: testConstants.defaultTendermintEventId, + price: '1', + size: '1', + fee: '1000', + affiliateRevShare: '500', + }), + FillTable.create({ + ...testConstants.defaultFill, + liquidity: Liquidity.TAKER, + createdAt: currentDt.minus({ weeks: 2 }).toISO(), + eventId: testConstants.defaultTendermintEventId2, + price: '1', + size: '1', + fee: '1000', + affiliateRevShare: '500', + }), + // Create block at current time + BlockTable.create({ + blockHeight: '3', + time: DateTime.utc().toISO(), + }), + ]); + + // Simulate backfill + let backfillTime: DateTime | undefined = await getAffiliateInfoUpdateTime(); + while (backfillTime !== undefined && DateTime.fromISO(backfillTime.toISO()) < currentDt) { + await affiliateInfoUpdateTask(); + backfillTime = await getAffiliateInfoUpdateTime(); + } + + const expectedAffiliateInfo: AffiliateInfoFromDatabase = { + address: testConstants.defaultWallet2.address, + affiliateEarnings: '1000', + referredMakerTrades: 0, + referredTakerTrades: 2, + totalReferredFees: '2000', + totalReferredUsers: 1, + referredNetProtocolEarnings: '1000', + firstReferralBlockHeight: '1', + referredTotalVolume: '2', + }; + const updatedInfo: AffiliateInfoFromDatabase | undefined = await AffiliateInfoTable + .findById(testConstants.defaultWallet2.address); + expect(updatedInfo).toEqual(expectedAffiliateInfo); + }); + + it('Successfully backfills on first run', async () => { + // We will simulate a 1 week backfill from the beginning time of + // `defaultLastUpdateTime`=2024-09-16T00:00:00Z. We do this by leaving persistent cache + // affiliateInfoUpdateTime empty and create fills around `defaultLastUpdateTime`. Then we run + // the backfill 7 times. + expect(await getAffiliateInfoUpdateTime()).toBeUndefined(); + + const referenceDt: DateTime = DateTime.fromISO('2024-09-16T00:00:00Z'); + + // defaultWallet2 will be affiliate and defaultWallet will be referee + await AffiliateReferredUsersTable.create({ + affiliateAddress: testConstants.defaultWallet2.address, + refereeAddress: testConstants.defaultWallet.address, + referredAtBlock: '1', + }); + + await Promise.all([ + // Fills spannings 7 days after referenceDt + FillTable.create({ + ...testConstants.defaultFill, + liquidity: Liquidity.TAKER, + createdAt: referenceDt.plus({ days: 1 }).toISO(), + eventId: testConstants.defaultTendermintEventId, + price: '1', + size: '1', + fee: '1000', + affiliateRevShare: '500', + }), + FillTable.create({ + ...testConstants.defaultFill, + liquidity: Liquidity.TAKER, + createdAt: referenceDt.plus({ days: 7 }).toISO(), + eventId: testConstants.defaultTendermintEventId2, + price: '1', + size: '1', + fee: '1000', + affiliateRevShare: '500', + }), + // Create block in the future relative to referenceDt + BlockTable.create({ + blockHeight: '3', + time: referenceDt.plus({ days: 7 }).toISO(), + }), + ]); + + // Simulate roundtable runs + for (let i = 0; i < 7; i++) { + await affiliateInfoUpdateTask(); + } + + const expectedAffiliateInfo: AffiliateInfoFromDatabase = { + address: testConstants.defaultWallet2.address, + affiliateEarnings: '1000', + referredMakerTrades: 0, + referredTakerTrades: 2, + totalReferredFees: '2000', + totalReferredUsers: 1, + referredNetProtocolEarnings: '1000', + firstReferralBlockHeight: '1', + referredTotalVolume: '2', + }; + const updatedInfo = await AffiliateInfoTable.findById(testConstants.defaultWallet2.address); + expect(updatedInfo).toEqual(expectedAffiliateInfo); + }); +}); + +async function getAffiliateInfoUpdateTime(): Promise { + const persistentCache: PersistentCacheFromDatabase | undefined = await PersistentCacheTable + .findById( + PersistentCacheKeys.AFFILIATE_INFO_UPDATE_TIME, + ); + const lastUpdateTime: DateTime | undefined = persistentCache?.value + ? DateTime.fromISO(persistentCache.value) + : undefined; + return lastUpdateTime; +} diff --git a/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts b/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts index edb61608c0..a0a26fdad9 100644 --- a/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts +++ b/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts @@ -3,10 +3,11 @@ import { testConstants, testMocks, WalletTable, - SubaccountTable, PersistentCacheTable, FillTable, OrderTable, + PersistentCacheKeys, + PersistentCacheFromDatabase, } from '@dydxprotocol-indexer/postgres'; import walletTotalVolumeUpdateTask from '../../src/tasks/update-wallet-total-volume'; import { DateTime } from 'luxon'; @@ -33,22 +34,16 @@ describe('update-wallet-total-volume', () => { }); it('Successfully updates totalVolume multiple times', async () => { - const defaultSubaccountId = await SubaccountTable.findAll( - { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, - [], - {}, - ); // Set persistent cache totalVolumeUpdateTime so walletTotalVolumeUpdateTask() does not attempt // to backfill await PersistentCacheTable.create({ - key: 'totalVolumeUpdateTime', + key: PersistentCacheKeys.TOTAL_VOLUME_UPDATE_TIME, value: DateTime.utc().toISO(), }); // First task run: one new fill await FillTable.create({ ...testConstants.defaultFill, - subaccountId: defaultSubaccountId[0].id, createdAt: DateTime.utc().toISO(), eventId: testConstants.defaultTendermintEventId, price: '1', @@ -72,7 +67,6 @@ describe('update-wallet-total-volume', () => { // Third task run: one new fill await FillTable.create({ ...testConstants.defaultFill, - subaccountId: defaultSubaccountId[0].id, createdAt: DateTime.utc().toISO(), eventId: testConstants.defaultTendermintEventId2, price: '1', @@ -90,7 +84,7 @@ describe('update-wallet-total-volume', () => { // Set persistent cache totalVolumeUpdateTime so walletTotalVolumeUpdateTask() does not attempt // to backfill await PersistentCacheTable.create({ - key: 'totalVolumeUpdateTime', + key: PersistentCacheKeys.TOTAL_VOLUME_UPDATE_TIME, value: DateTime.utc().toISO(), }); @@ -110,16 +104,10 @@ describe('update-wallet-total-volume', () => { it('Successfully backfills from past date', async () => { const currentDt: DateTime = DateTime.utc(); - const defaultSubaccountId = await SubaccountTable.findAll( - { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, - [], - {}, - ); // Create 3 fills spanning 2 weeks in the past await FillTable.create({ ...testConstants.defaultFill, - subaccountId: defaultSubaccountId[0].id, createdAt: currentDt.toISO(), eventId: testConstants.defaultTendermintEventId, price: '1', @@ -127,7 +115,6 @@ describe('update-wallet-total-volume', () => { }); await FillTable.create({ ...testConstants.defaultFill, - subaccountId: defaultSubaccountId[0].id, createdAt: currentDt.minus({ weeks: 1 }).toISO(), eventId: testConstants.defaultTendermintEventId2, price: '2', @@ -135,7 +122,6 @@ describe('update-wallet-total-volume', () => { }); await FillTable.create({ ...testConstants.defaultFill, - subaccountId: defaultSubaccountId[0].id, createdAt: currentDt.minus({ weeks: 2 }).toISO(), eventId: testConstants.defaultTendermintEventId3, price: '3', @@ -144,7 +130,7 @@ describe('update-wallet-total-volume', () => { // Set persistent cache totalVolumeUpdateTime to 3 weeks ago to emulate backfill from 3 weeks. await PersistentCacheTable.create({ - key: 'totalVolumeUpdateTime', + key: PersistentCacheKeys.TOTAL_VOLUME_UPDATE_TIME, value: currentDt.minus({ weeks: 3 }).toISO(), }); @@ -162,12 +148,6 @@ describe('update-wallet-total-volume', () => { }); it('Successfully backfills on first run', async () => { - const defaultSubaccountId = await SubaccountTable.findAll( - { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, - [], - {}, - ); - // Leave persistent cache totalVolumeUpdateTime empty and create fills around // `defaultLastUpdateTime` value to emulate backfilling from very beginning expect(await getTotalVolumeUpdateTime()).toBeUndefined(); @@ -176,7 +156,6 @@ describe('update-wallet-total-volume', () => { await FillTable.create({ ...testConstants.defaultFill, - subaccountId: defaultSubaccountId[0].id, createdAt: referenceDt.plus({ days: 1 }).toISO(), eventId: testConstants.defaultTendermintEventId, price: '1', @@ -184,7 +163,6 @@ describe('update-wallet-total-volume', () => { }); await FillTable.create({ ...testConstants.defaultFill, - subaccountId: defaultSubaccountId[0].id, createdAt: referenceDt.plus({ days: 2 }).toISO(), eventId: testConstants.defaultTendermintEventId2, price: '2', @@ -192,7 +170,6 @@ describe('update-wallet-total-volume', () => { }); await FillTable.create({ ...testConstants.defaultFill, - subaccountId: defaultSubaccountId[0].id, createdAt: referenceDt.plus({ days: 3 }).toISO(), eventId: testConstants.defaultTendermintEventId3, price: '3', @@ -213,9 +190,12 @@ describe('update-wallet-total-volume', () => { }); async function getTotalVolumeUpdateTime(): Promise { - const persistentCache = await PersistentCacheTable.findById('totalVolumeUpdateTime'); - const lastUpdateTime1 = persistentCache?.value + const persistentCache: PersistentCacheFromDatabase | undefined = await PersistentCacheTable + .findById( + PersistentCacheKeys.TOTAL_VOLUME_UPDATE_TIME, + ); + const lastUpdateTime = persistentCache?.value ? DateTime.fromISO(persistentCache.value) : undefined; - return lastUpdateTime1; + return lastUpdateTime; } diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index 2e712c54ac..a8ec2cb87b 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -58,6 +58,7 @@ export const configSchema = { LOOPS_ENABLED_LEADERBOARD_PNL_MONTHLY: parseBoolean({ default: false }), LOOPS_ENABLED_LEADERBOARD_PNL_YEARLY: parseBoolean({ default: false }), LOOPS_ENABLED_UPDATE_WALLET_TOTAL_VOLUME: parseBoolean({ default: true }), + LOOPS_ENABLED_UPDATE_AFFILIATE_INFO: parseBoolean({ default: true }), LOOPS_ENABLED_DELETE_OLD_FIREBASE_NOTIFICATION_TOKENS: parseBoolean({ default: true }), // Loop Timing @@ -130,6 +131,9 @@ export const configSchema = { LOOPS_INTERVAL_MS_UPDATE_WALLET_TOTAL_VOLUME: parseInteger({ default: THIRTY_SECONDS_IN_MILLISECONDS, }), + LOOPS_INTERVAL_MS_UPDATE_AFFILIATE_INFO: parseInteger({ + default: THIRTY_SECONDS_IN_MILLISECONDS, + }), LOOPS_INTERVAL_MS_DELETE_FIREBASE_NOTIFICATION_TOKENS_MONTHLY: parseInteger({ default: 30 * ONE_DAY_IN_MILLISECONDS, }), diff --git a/indexer/services/roundtable/src/index.ts b/indexer/services/roundtable/src/index.ts index 09b83c5e77..f52903ac19 100644 --- a/indexer/services/roundtable/src/index.ts +++ b/indexer/services/roundtable/src/index.ts @@ -26,6 +26,7 @@ import subaccountUsernameGeneratorTask from './tasks/subaccount-username-generat import takeFastSyncSnapshotTask from './tasks/take-fast-sync-snapshot'; import trackLag from './tasks/track-lag'; import uncrossOrderbookTask from './tasks/uncross-orderbook'; +import updateAffiliateInfoTask from './tasks/update-affiliate-info'; import updateComplianceDataTask from './tasks/update-compliance-data'; import updateResearchEnvironmentTask from './tasks/update-research-environment'; import updateWalletTotalVolumeTask from './tasks/update-wallet-total-volume'; @@ -256,7 +257,13 @@ async function start(): Promise { config.LOOPS_INTERVAL_MS_UPDATE_WALLET_TOTAL_VOLUME, ); } - + if (config.LOOPS_ENABLED_UPDATE_AFFILIATE_INFO) { + startLoop( + updateAffiliateInfoTask, + 'update_affiliate_info', + config.LOOPS_INTERVAL_MS_UPDATE_AFFILIATE_INFO, + ); + } if (config.LOOPS_ENABLED_DELETE_OLD_FIREBASE_NOTIFICATION_TOKENS) { startLoop( deleteOldFirebaseNotificationTokensTask, diff --git a/indexer/services/roundtable/src/tasks/update-affiliate-info.ts b/indexer/services/roundtable/src/tasks/update-affiliate-info.ts new file mode 100644 index 0000000000..58191d5abf --- /dev/null +++ b/indexer/services/roundtable/src/tasks/update-affiliate-info.ts @@ -0,0 +1,64 @@ +import { logger } from '@dydxprotocol-indexer/base'; +import { + PersistentCacheTable, + AffiliateInfoTable, + PersistentCacheKeys, + PersistentCacheFromDatabase, + BlockFromDatabase, + BlockTable, + Transaction, +} from '@dydxprotocol-indexer/postgres'; +import { DateTime } from 'luxon'; + +const defaultLastUpdateTime: string = '2024-09-16T00:00:00Z'; + +/** + * Update the affiliate info for all affiliate addresses. + */ +export default async function runTask(): Promise { + // Wrap getting cache, updating info, and setting cache in one transaction so that persistent + // cache and affilitate info table are in sync. + const txId: number = await Transaction.start(); + try { + const latestBlock: BlockFromDatabase = await BlockTable.getLatest(); + if (latestBlock.time === null) { + throw Error('Failed to get latest block time'); + } + const persistentCacheEntry: PersistentCacheFromDatabase | undefined = await PersistentCacheTable + .findById(PersistentCacheKeys.AFFILIATE_INFO_UPDATE_TIME, { txId }); + if (!persistentCacheEntry) { + logger.info({ + at: 'update-affiliate-info#runTask', + message: `No previous ${PersistentCacheKeys.AFFILIATE_INFO_UPDATE_TIME} found in persistent cache table. Will use default value: ${defaultLastUpdateTime}`, + }); + } + const windowStartTime: DateTime = DateTime.fromISO(persistentCacheEntry + ? persistentCacheEntry.value + : defaultLastUpdateTime); + + let windowEndTime = DateTime.fromISO(latestBlock.time); + // During backfilling, we process one day at a time to reduce roundtable runtime. + if (windowEndTime > windowStartTime.plus({ days: 1 })) { + windowEndTime = windowStartTime.plus({ days: 1 }); + } + + logger.info({ + at: 'update-affiliate-info#runTask', + message: `Updating affiliate info from ${windowStartTime.toISO()} to ${windowEndTime.toISO()}`, + }); + await AffiliateInfoTable.updateInfo(windowStartTime.toISO(), windowEndTime.toISO(), txId); + await PersistentCacheTable.upsert({ + key: PersistentCacheKeys.AFFILIATE_INFO_UPDATE_TIME, + value: windowEndTime.toISO(), + }, { txId }); + + await Transaction.commit(txId); + } catch (error) { + await Transaction.rollback(txId); + logger.error({ + at: 'update-affiliate-info#runTask', + message: 'Error when updating affiliate info in affiliate_info table', + error, + }); + } +} diff --git a/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts b/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts index 5229d068dd..0ecc4a8fa5 100644 --- a/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts +++ b/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts @@ -1,42 +1,41 @@ import { logger, stats } from '@dydxprotocol-indexer/base'; -import { PersistentCacheTable, WalletTable } from '@dydxprotocol-indexer/postgres'; +import { + PersistentCacheTable, WalletTable, PersistentCacheKeys, PersistentCacheFromDatabase, +} from '@dydxprotocol-indexer/postgres'; import { DateTime } from 'luxon'; import config from '../config'; const defaultLastUpdateTime: string = '2020-01-01T00:00:00Z'; -const persistentCacheKey: string = 'totalVolumeUpdateTime'; /** - * Update the total volume for each address in the wallet table. + * Update the total volume for each addresses in the wallet table who filled recently. */ export default async function runTask(): Promise { try { const start = Date.now(); - const persistentCacheEntry = await PersistentCacheTable.findById(persistentCacheKey); + const persistentCacheEntry: PersistentCacheFromDatabase | undefined = await PersistentCacheTable + .findById(PersistentCacheKeys.TOTAL_VOLUME_UPDATE_TIME); if (!persistentCacheEntry) { logger.info({ - at: 'update-address-total-volume#runTask', - message: `No previous totalVolumeUpdateTime found in persistent cache table. Will use default value: ${defaultLastUpdateTime}`, + at: 'update-wallet-total-volume#runTask', + message: `No previous ${PersistentCacheKeys.TOTAL_VOLUME_UPDATE_TIME} found in persistent cache table. Will use default value: ${defaultLastUpdateTime}`, }); } - const lastUpdateTime = DateTime.fromISO(persistentCacheEntry + const lastUpdateTime: DateTime = DateTime.fromISO(persistentCacheEntry ? persistentCacheEntry.value : defaultLastUpdateTime); - let currentTime = DateTime.utc(); + + let windowEndTime = DateTime.utc(); // During backfilling, we process one day at a time to reduce roundtable runtime. - if (currentTime > lastUpdateTime.plus({ days: 1 })) { - currentTime = lastUpdateTime.plus({ days: 1 }); + if (windowEndTime > lastUpdateTime.plus({ days: 1 })) { + windowEndTime = lastUpdateTime.plus({ days: 1 }); } - await WalletTable.updateTotalVolume(lastUpdateTime.toISO(), currentTime.toISO()); - await PersistentCacheTable.upsert({ - key: persistentCacheKey, - value: currentTime.toISO(), - }); + await WalletTable.updateTotalVolume(lastUpdateTime.toISO(), windowEndTime.toISO()); stats.timing( `${config.SERVICE_NAME}.update_wallet_total_volume_timing`, @@ -44,7 +43,7 @@ export default async function runTask(): Promise { ); } catch (error) { logger.error({ - at: 'update-address-total-volume#runTask', + at: 'update-wallet-total-volume#runTask', message: 'Error when updating totalVolume in wallets table', error, });