Skip to content

Commit

Permalink
[OTE-755] Wrap wallet total volume roundtable queries in transaction (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryfan01234 authored Sep 20, 2024
1 parent 18f76cc commit 82598aa
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 194 deletions.
107 changes: 34 additions & 73 deletions indexer/packages/postgres/__tests__/stores/wallet-table.test.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
import { WalletFromDatabase, PersistentCacheKeys, PersistentCacheFromDatabase } from '../../src/types';
import { WalletFromDatabase } from '../../src/types';
import { clearData, migrate, teardown } from '../../src/helpers/db-helpers';
import { DateTime } from 'luxon';
import {
defaultFill,
defaultOrder,
defaultSubaccount,
defaultTendermintEventId,
defaultTendermintEventId2,
defaultTendermintEventId3,
defaultTendermintEventId4,
defaultWallet,
defaultWallet2,
isolatedMarketOrder,
isolatedSubaccount,
defaultSubaccountId,
isolatedSubaccountId,
} from '../helpers/constants';
import * as FillTable from '../../src/stores/fill-table';
import * as OrderTable from '../../src/stores/order-table';
import * as WalletTable from '../../src/stores/wallet-table';
import * as SubaccountTable from '../../src/stores/subaccount-table';
import * as PersistentCacheTable from '../../src/stores/persistent-cache-table';
import { seedData } from '../helpers/mock-generators';

describe('Wallet store', () => {
Expand Down Expand Up @@ -99,9 +97,9 @@ describe('Wallet store', () => {
firstFillTime.minus({ hours: 1 }).toISO(), // need to minus because left bound is exclusive
firstFillTime.plus({ hours: 1 }).toISO(),
);
let wallet: WalletFromDatabase | undefined = await WalletTable
const wallet1: WalletFromDatabase | undefined = await WalletTable
.findById(defaultWallet.address);
expect(wallet).toEqual(expect.objectContaining({
expect(wallet1).toEqual(expect.objectContaining({
...defaultWallet,
totalVolume: '103',
}));
Expand All @@ -113,45 +111,12 @@ describe('Wallet store', () => {
firstFillTime.toISO(), // exclusive -> filters out first fill from each subaccount
firstFillTime.plus({ minutes: 2 }).toISO(),
);
wallet = await WalletTable.findById(defaultWallet.address);
expect(wallet).toEqual(expect.objectContaining({
const wallet2 = await WalletTable.findById(defaultWallet.address);
expect(wallet2).toEqual(expect.objectContaining({
...defaultWallet,
totalVolume: '105', // 103 + 2
}));
});

it('Successfully upserts persistent cache', async () => {
const referenceDt = DateTime.utc();

// Sets initial persistent cache value
let leftBound: DateTime = referenceDt.minus({ hours: 2 });
let rightBound: DateTime = referenceDt.minus({ hours: 1 });

await WalletTable.updateTotalVolume(leftBound.toISO(), rightBound.toISO());

let persistentCache: PersistentCacheFromDatabase | undefined = await PersistentCacheTable
.findById(PersistentCacheKeys.TOTAL_VOLUME_UPDATE_TIME);
let lastUpdateTime: string | undefined = persistentCache?.value;
expect(lastUpdateTime).not.toBeUndefined();
if (lastUpdateTime !== undefined) {
expect(lastUpdateTime).toEqual(rightBound.toISO());
}

// Updates persistent cache value
leftBound = referenceDt.minus({ hours: 1 });
rightBound = referenceDt;

await WalletTable.updateTotalVolume(leftBound.toISO(), rightBound.toISO());

persistentCache = await PersistentCacheTable.findById(
PersistentCacheKeys.TOTAL_VOLUME_UPDATE_TIME,
);
lastUpdateTime = persistentCache?.value;
expect(lastUpdateTime).not.toBeUndefined();
if (lastUpdateTime !== undefined) {
expect(lastUpdateTime).toEqual(rightBound.toISO());
}
});
});
});

Expand All @@ -164,22 +129,12 @@ describe('Wallet store', () => {
*/
async function populateWalletSubaccountFill(): Promise<DateTime> {
await seedData();
await OrderTable.create(defaultOrder);
await OrderTable.create(isolatedMarketOrder);

// seedData() creates defaultWallet with defaultSubaccount and isolatedSubaccount
const defaultSubaccountId = await SubaccountTable.findAll(
{ subaccountNumber: defaultSubaccount.subaccountNumber },
[],
{},
);
const isolatedSubaccountId = await SubaccountTable.findAll(
{ subaccountNumber: isolatedSubaccount.subaccountNumber },
[],
{},
);
await Promise.all([
OrderTable.create(defaultOrder),
OrderTable.create(isolatedMarketOrder),
]);

const referenceDt = DateTime.utc().minus({ hours: 1 });
const referenceDt: DateTime = DateTime.utc().minus({ hours: 1 });
const eventIds = [
defaultTendermintEventId,
defaultTendermintEventId2,
Expand All @@ -188,27 +143,33 @@ async function populateWalletSubaccountFill(): Promise<DateTime> {
];
let eventIdx = 0;

const fillPromises: Promise<any>[] = [];
// Create 3 fills with 1 min increments for defaultSubaccount
for (let i = 0; i < 3; i++) {
await FillTable.create({
...defaultFill,
subaccountId: defaultSubaccountId[0].id,
createdAt: referenceDt.plus({ minutes: i }).toISO(),
eventId: eventIds[eventIdx],
price: '1',
size: '1',
});
fillPromises.push(
FillTable.create({
...defaultFill,
subaccountId: defaultSubaccountId,
createdAt: referenceDt.plus({ minutes: i }).toISO(),
eventId: eventIds[eventIdx],
price: '1',
size: '1',
}),
);
eventIdx += 1;
}
// Create 1 fill at referenceDt for isolatedSubaccount
await FillTable.create({
...defaultFill,
subaccountId: isolatedSubaccountId[0].id,
createdAt: referenceDt.toISO(),
eventId: eventIds[eventIdx],
price: '10',
size: '10',
});
fillPromises.push(
FillTable.create({
...defaultFill,
subaccountId: isolatedSubaccountId,
createdAt: referenceDt.toISO(),
eventId: eventIds[eventIdx],
price: '10',
size: '10',
}),
);
await Promise.all(fillPromises);

return referenceDt;
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export async function findById(
* @function updateInfo
* @param {string} windowStartTs - The exclusive start timestamp for filtering fills.
* @param {string} windowEndTs - The inclusive end timestamp for filtering fill.
* @param {Options} [options={ txId: undefined }] - Optional transaction ID or additional options.
* @param {number} [txId] - Optional transaction ID.
* @returns {Promise<void>}
*/
export async function updateInfo(
Expand Down
30 changes: 14 additions & 16 deletions indexer/packages/postgres/src/stores/wallet-table.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import Knex from 'knex';
import { PartialModelObject, QueryBuilder } from 'objection';

import { DEFAULT_POSTGRES_OPTIONS } from '../constants';
Expand All @@ -15,7 +16,6 @@ import {
WalletFromDatabase,
WalletQueryConfig,
WalletUpdateObject,
PersistentCacheKeys,
} from '../types';

export async function findAll(
Expand Down Expand Up @@ -116,18 +116,21 @@ export async function findById(
* Calculates the total volume in a given time window for each address and adds the values to the
* existing totalVolume values.
*
* @param windowStartTs - The start timestamp of the time window (exclusive).
* @param windowEndTs - The end timestamp of the time window (inclusive).
* @async
* @function updateTotalVolume
* @param {string} windowStartTs - The exclusive start timestamp for filtering fills.
* @param {string} windowEndTs - The inclusive end timestamp for filtering fill.
* @param {number} [txId] - Optional transaction ID.
* @returns {Promise<void>}
*/
export async function updateTotalVolume(
windowStartTs: string,
windowEndTs: string,
txId: number | undefined = undefined,
) : Promise<void> {
const transaction: Knex.Transaction | undefined = Transaction.get(txId);

await knexPrimary.raw(
`
BEGIN;
const query = `
WITH fills_total AS (
-- Step 1: Calculate total volume for each subaccountId
SELECT "subaccountId", SUM("price" * "size") AS "totalVolume"
Expand All @@ -153,14 +156,9 @@ export async function updateTotalVolume(
SET "totalVolume" = COALESCE(wallets."totalVolume", 0) + av."totalVolume"
FROM address_volume av
WHERE wallets."address" = av."address";
`;

-- Step 5: Upsert new totalVolumeUpdateTime to persistent_cache table
INSERT INTO persistent_cache (key, value)
VALUES ('${PersistentCacheKeys.TOTAL_VOLUME_UPDATE_TIME}', '${windowEndTs}')
ON CONFLICT (key)
DO UPDATE SET value = EXCLUDED.value;
COMMIT;
`,
);
return transaction
? knexPrimary.raw(query).transacting(transaction)
: knexPrimary.raw(query);
}
Loading

0 comments on commit 82598aa

Please sign in to comment.