Skip to content

Commit

Permalink
[OTE-755] Add update wallet total volume roundtable task (#2222)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryfan01234 authored Sep 9, 2024
1 parent 4780b4c commit 66f27b0
Show file tree
Hide file tree
Showing 7 changed files with 479 additions and 1 deletion.
123 changes: 122 additions & 1 deletion indexer/packages/postgres/__tests__/stores/wallet-table.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,26 @@
import { WalletFromDatabase } from '../../src/types';
import { clearData, migrate, teardown } from '../../src/helpers/db-helpers';
import { defaultWallet2, defaultWallet3 } from '../helpers/constants';
import { DateTime } from 'luxon';
import {
defaultFill,
defaultOrder,
defaultSubaccount,
defaultTendermintEventId,
defaultTendermintEventId2,
defaultTendermintEventId3,
defaultTendermintEventId4,
defaultWallet,
defaultWallet2,
defaultWallet3,
isolatedMarketOrder,
isolatedSubaccount,
} from '../helpers/constants';
import * as FillTable from '../../src/stores/fill-table';
import * as OrderTable from '../../src/stores/order-table';
import * as WalletTable from '../../src/stores/wallet-table';
import * as SubaccountTable from '../../src/stores/subaccount-table';
import * as PersistentCacheTable from '../../src/stores/persistent-cache-table';
import { seedData } from '../helpers/mock-generators';

describe('Wallet store', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -92,4 +111,106 @@ describe('Wallet store', () => {
expect(wallets.length).toEqual(1);
expect(wallets[0]).toEqual(expect.objectContaining(defaultWallet3));
});

it('Successfully updates totalVolume for time window multiple times', async () => {
const firstFillTime = await populateWalletSubaccountFill();

// Update totalVolume for a time window that covers all fills
await WalletTable.updateTotalVolume(
firstFillTime.minus({ hours: 1 }).toISO(), // need to minus because left bound is exclusive
firstFillTime.plus({ hours: 1 }).toISO(),
);
let wallet = await WalletTable.findById(defaultWallet.address);
expect(wallet).toEqual(expect.objectContaining({
...defaultWallet,
totalVolume: '103',
}));

// Update totalVolume for a time window that excludes some fills
// For convenience, we will reuse the existing fills data. The total volume calculated in this
// window should be added to the total volume above.
await WalletTable.updateTotalVolume(
firstFillTime.toISO(), // exclusive -> filters out first fill from each subaccount
firstFillTime.plus({ minutes: 2 }).toISO(),
);
wallet = await WalletTable.findById(defaultWallet.address);
expect(wallet).toEqual(expect.objectContaining({
...defaultWallet,
totalVolume: '105', // 103 + 2
}));
});

it('Successfully updates totalVolumeUpdateTime in persistent cache', async () => {
const leftBound = DateTime.utc().minus({ hours: 1 });
const rightBound = DateTime.utc();
await WalletTable.updateTotalVolume(leftBound.toISO(), rightBound.toISO());

const persistentCache = await PersistentCacheTable.findById('totalVolumeUpdateTime');
const lastUpdateTime = persistentCache?.value
? DateTime.fromISO(persistentCache.value)
: undefined;

expect(lastUpdateTime).not.toBeUndefined();
if (lastUpdateTime?.toMillis() !== undefined) {
expect(lastUpdateTime.toMillis()).toEqual(rightBound.toMillis());
}
});
});

/**
* Helper function to add entries into wallet, subaccount, fill tables.
* Create a wallet with 2 subaccounts; one subaccount has 3 fills and the other has 1 fill.
* The fills are at t=0,1,2 and t=1 for the subaccounts respectively.
* This setup allows us to test that the totalVolume is correctly calculated for a time window.
* @returns first fill time in ISO format
*/
async function populateWalletSubaccountFill(): Promise<DateTime> {
await seedData();
await OrderTable.create(defaultOrder);
await OrderTable.create(isolatedMarketOrder);

// seedData() creates defaultWallet with defaultSubaccount and isolatedSubaccount
const defaultSubaccountId = await SubaccountTable.findAll(
{ subaccountNumber: defaultSubaccount.subaccountNumber },
[],
{},
);
const isolatedSubaccountId = await SubaccountTable.findAll(
{ subaccountNumber: isolatedSubaccount.subaccountNumber },
[],
{},
);

const referenceDt = DateTime.utc().minus({ hours: 1 });
const eventIds = [
defaultTendermintEventId,
defaultTendermintEventId2,
defaultTendermintEventId3,
defaultTendermintEventId4,
];
let eventIdx = 0;

// Create 3 fills with 1 min increments for defaultSubaccount
for (let i = 0; i < 3; i++) {
await FillTable.create({
...defaultFill,
subaccountId: defaultSubaccountId[0].id,
createdAt: referenceDt.plus({ minutes: i }).toISO(),
eventId: eventIds[eventIdx],
price: '1',
size: '1',
});
eventIdx += 1;
}
// Create 1 fill at referenceDt for isolatedSubaccount
await FillTable.create({
...defaultFill,
subaccountId: isolatedSubaccountId[0].id,
createdAt: referenceDt.toISO(),
eventId: eventIds[eventIdx],
price: '10',
size: '10',
});

return referenceDt;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import * as Knex from 'knex';

export async function up(knex: Knex): Promise<void> {
await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS "fills_createdat_index" ON "fills" ("createdAt");
`);
}

export async function down(knex: Knex): Promise<void> {
await knex.raw(`
DROP INDEX CONCURRENTLY IF EXISTS "fills_createdat_index";
`);
}

export const config = {
transaction: false,
};
55 changes: 55 additions & 0 deletions indexer/packages/postgres/src/stores/wallet-table.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { PartialModelObject, QueryBuilder } from 'objection';

import { DEFAULT_POSTGRES_OPTIONS } from '../constants';
import { knexReadReplica } from '../helpers/knex';
import { setupBaseQuery, verifyAllRequiredFields } from '../helpers/stores-helpers';
import Transaction from '../helpers/transaction';
import WalletModel from '../models/wallet-model';
Expand Down Expand Up @@ -102,6 +103,7 @@ export async function upsert(
// should only ever be one wallet
return wallets[0];
}

export async function findById(
address: string,
options: Options = DEFAULT_POSTGRES_OPTIONS,
Expand All @@ -114,3 +116,56 @@ export async function findById(
.findById(address)
.returning('*');
}

/**
* Calculates the total volume in a given time window for each address and adds the values to the
* existing totalVolume values.
*
* @param windowStartTs - The start timestamp of the time window (exclusive).
* @param windowEndTs - The end timestamp of the time window (inclusive).
*/
export async function updateTotalVolume(
windowStartTs: string,
windowEndTs: string,
) : Promise<void> {

await knexReadReplica.getConnection().raw(
`
BEGIN;
WITH fills_total AS (
-- Step 1: Calculate total volume for each subaccountId
SELECT "subaccountId", SUM("price" * "size") AS "totalVolume"
FROM fills
WHERE "createdAt" > '${windowStartTs}' AND "createdAt" <= '${windowEndTs}'
GROUP BY "subaccountId"
),
subaccount_volume AS (
-- Step 2: Merge with subaccounts table to get the address
SELECT s."address", f."totalVolume"
FROM fills_total f
JOIN subaccounts s
ON f."subaccountId" = s."id"
),
address_volume AS (
-- Step 3: Group by address and sum the totalVolume
SELECT "address", SUM("totalVolume") AS "totalVolume"
FROM subaccount_volume
GROUP BY "address"
)
-- Step 4: Left join the result with the wallets table and update the total volume
UPDATE wallets
SET "totalVolume" = COALESCE(wallets."totalVolume", 0) + av."totalVolume"
FROM address_volume av
WHERE wallets."address" = av."address";
-- Step 5: Upsert new totalVolumeUpdateTime to persistent_cache table
INSERT INTO persistent_cache (key, value)
VALUES ('totalVolumeUpdateTime', '${windowEndTs}')
ON CONFLICT (key)
DO UPDATE SET value = EXCLUDED.value;
COMMIT;
`,
);
}
Loading

0 comments on commit 66f27b0

Please sign in to comment.