Skip to content

Commit

Permalink
respond to wills comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Christopher-Li committed Jan 5, 2024
1 parent bc247a4 commit 522a238
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export default class TradingRewardAggregationModel extends Model {
address: { type: 'string' },
startedAt: { type: 'string', format: 'date-time' }, // Inclusive
startedAtHeight: { type: 'string', pattern: IntegerPattern }, // Inclusive
endedAt: { type: ['string', 'null'], format: 'date-time' }, // Inclusive
endedAt: { type: ['string', 'null'], format: 'date-time' }, // Exclusive
endedAtHeight: { type: ['string', 'null'], pattern: IntegerPattern }, // Inclusive
period: { type: 'string', enum: [...Object.values(TradingRewardAggregationPeriod)] },
amount: { type: 'string', pattern: NonNegativeNumericPattern },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
} from '../../src/caches/aggregate-trading-rewards-processed-cache';
import { IsoString, TradingRewardAggregationPeriod } from '@dydxprotocol-indexer/postgres';

describe('cancelledOrdersCache', () => {
describe('aggregateTradingRewardsProcessedCache', () => {
beforeEach(async () => {
await deleteAllAsync(client);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ import { RedisClient } from 'redis';

import { getAsync } from '../helpers/redis';

/**
* Cache key for the aggregate trading rewards processed cache. Given a
* TradingRewardAggregationPeriod, this cache stores the timestamp of the
* trading rewards that have been processed up to and excluding that timestamp.
*/
export const AGGREGATE_TRADING_REWARDS_PROCESSED_CACHE_KEY: string = 'v4/aggregate_trading_rewards_processed/';

function getKey(period: TradingRewardAggregationPeriod): string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { UTC_OPTIONS } from '../../src/lib/constants';
import { deleteAllAsync } from '@dydxprotocol-indexer/redis/build/src/helpers/redis';
import { redisClient } from '../../src/helpers/redis';
import { AggregateTradingRewardsProcessedCache } from '@dydxprotocol-indexer/redis';
import config from '../../src/config';

describe('aggregate-trading-rewards', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -56,33 +57,76 @@ describe('aggregate-trading-rewards', () => {
amount: '10',
};

describe('maybeDeleteIncompleteAggregatedTradingReward', () => {
it(
'Deletes incomplete aggregations when cache is empty and only one incomplete aggregations exist',
async () => {
await Promise.all([
TradingRewardAggregationTable.create(defaultMonthlyTradingRewardAggregation2),
TradingRewardAggregationTable.create({
...defaultMonthlyTradingRewardAggregation2,
period: TradingRewardAggregationPeriod.WEEKLY,
}),
]);
const aggregateTradingReward: AggregateTradingReward = new AggregateTradingReward(
TradingRewardAggregationPeriod.MONTHLY,
);
await aggregateTradingReward.maybeDeleteIncompleteAggregatedTradingReward();
const aggregations:
TradingRewardAggregationFromDatabase[] = await TradingRewardAggregationTable.findAll(
{},
[],
);
expect(aggregations.length).toEqual(1);
},
);

it(
'Deletes incomplete aggregations when cache is empty and multiple aggregations exist',
async () => {
await Promise.all([
TradingRewardAggregationTable.create(defaultMonthlyTradingRewardAggregation),
TradingRewardAggregationTable.create(defaultMonthlyTradingRewardAggregation2),
TradingRewardAggregationTable.create({
...defaultMonthlyTradingRewardAggregation2,
period: TradingRewardAggregationPeriod.WEEKLY,
}),
createBlockWithTime(startedAt2.plus({ hours: 1 })),
]);
const aggregateTradingReward: AggregateTradingReward = new AggregateTradingReward(
TradingRewardAggregationPeriod.MONTHLY,
);
await aggregateTradingReward.maybeDeleteIncompleteAggregatedTradingReward();

const aggregations:
TradingRewardAggregationFromDatabase[] = await TradingRewardAggregationTable.findAll(
{},
[],
);
expect(aggregations.length).toEqual(2);
},
);
});

describe('getTradingRewardDataToProcessInterval', () => {
it.each([
TradingRewardAggregationPeriod.DAILY,
TradingRewardAggregationPeriod.WEEKLY,
TradingRewardAggregationPeriod.MONTHLY,
])('Successfully returns undefined if there are no blocks in the database', async (
])('Throws error if there are no blocks in the database', async (
period: TradingRewardAggregationPeriod,
) => {
await dbHelpers.clearData();
const aggregateTradingReward: AggregateTradingReward = new AggregateTradingReward(period);
const interval:
Interval | undefined = await aggregateTradingReward.getTradingRewardDataToProcessInterval();

expect(interval).toBeUndefined();
expect(logger.info).toHaveBeenCalledWith(
expect.objectContaining({
message:
'Unable to aggregate trading rewards because there are no blocks in the database.',
}),
);
await expect(aggregateTradingReward.getTradingRewardDataToProcessInterval())
.rejects.toEqual(new Error('Unable to find latest block'));
});

it.each([
TradingRewardAggregationPeriod.DAILY,
TradingRewardAggregationPeriod.WEEKLY,
TradingRewardAggregationPeriod.MONTHLY,
])('Successfully returns first block time if cache is empty and no aggregations', async (
])('Successfully returns interval if cache is empty and no aggregations', async (
period: TradingRewardAggregationPeriod,
) => {
const firstBlockTime: DateTime = DateTime.fromISO(
Expand All @@ -92,7 +136,7 @@ describe('aggregate-trading-rewards', () => {
await createBlockWithTime(firstBlockTime.plus({ hours: 1 }));
const aggregateTradingReward: AggregateTradingReward = new AggregateTradingReward(period);
const interval:
Interval | undefined = await aggregateTradingReward.getTradingRewardDataToProcessInterval();
Interval = await aggregateTradingReward.getTradingRewardDataToProcessInterval();

expect(interval).not.toBeUndefined();
expect(interval).toEqual(Interval.fromDateTimes(
Expand All @@ -102,45 +146,36 @@ describe('aggregate-trading-rewards', () => {
});

it(
'Deletes incomplete aggregations when cache is empty and only one incomplete aggregations exist',
'Successfully returns interval when cache is empty and no',
async () => {
await Promise.all([
TradingRewardAggregationTable.create(defaultMonthlyTradingRewardAggregation2),
TradingRewardAggregationTable.create({
...defaultMonthlyTradingRewardAggregation2,
period: TradingRewardAggregationPeriod.WEEKLY,
}),
]);
await TradingRewardAggregationTable.create({
...defaultMonthlyTradingRewardAggregation,
period: TradingRewardAggregationPeriod.WEEKLY,
});
const aggregateTradingReward: AggregateTradingReward = new AggregateTradingReward(
TradingRewardAggregationPeriod.MONTHLY,
);
const interval:
Interval | undefined = await aggregateTradingReward.getTradingRewardDataToProcessInterval();
Interval = await aggregateTradingReward.getTradingRewardDataToProcessInterval();

const firstBlockTime: DateTime = DateTime.fromISO(
testConstants.defaultBlock.time,
UTC_OPTIONS,
).toUTC();
expect(interval).toEqual(Interval.fromDateTimes(
firstBlockTime,
firstBlockTime.plus({ hours: 1 })),
);

const aggregations:
TradingRewardAggregationFromDatabase[] = await TradingRewardAggregationTable.findAll(
{},
[],
firstBlockTime.plus({
milliseconds: config.AGGREGATE_TRADING_REWARDS_MAX_INTERVAL_SIZE_MS,
})),
);
expect(aggregations.length).toEqual(1);
},
);

it(
'Deletes incomplete aggregations when cache is empty and multiple aggregations exist',
'Successfully returns interval when cache is empty and a complete aggregations exist',
async () => {
await Promise.all([
TradingRewardAggregationTable.create(defaultMonthlyTradingRewardAggregation),
TradingRewardAggregationTable.create(defaultMonthlyTradingRewardAggregation2),
TradingRewardAggregationTable.create({
...defaultMonthlyTradingRewardAggregation2,
period: TradingRewardAggregationPeriod.WEEKLY,
Expand All @@ -151,9 +186,12 @@ describe('aggregate-trading-rewards', () => {
TradingRewardAggregationPeriod.MONTHLY,
);
const interval:
Interval | undefined = await aggregateTradingReward.getTradingRewardDataToProcessInterval();
Interval = await aggregateTradingReward.getTradingRewardDataToProcessInterval();
expect(interval).not.toBeUndefined();
expect(interval).toEqual(Interval.fromDateTimes(startedAt2, startedAt2.plus({ hours: 1 })));
expect(interval).toEqual(Interval.fromDateTimes(
startedAt2,
startedAt2.plus({ milliseconds: config.AGGREGATE_TRADING_REWARDS_MAX_INTERVAL_SIZE_MS }),
));

const aggregations:
TradingRewardAggregationFromDatabase[] = await TradingRewardAggregationTable.findAll(
Expand Down Expand Up @@ -185,7 +223,7 @@ describe('aggregate-trading-rewards', () => {
TradingRewardAggregationPeriod.MONTHLY,
);
const interval:
Interval | undefined = await aggregateTradingReward.getTradingRewardDataToProcessInterval();
Interval = await aggregateTradingReward.getTradingRewardDataToProcessInterval();
expect(interval).toEqual(Interval.fromDateTimes(endedAt2, endedAt2));

const aggregations:
Expand Down Expand Up @@ -218,7 +256,7 @@ describe('aggregate-trading-rewards', () => {
TradingRewardAggregationPeriod.MONTHLY,
);
const interval:
Interval | undefined = await aggregateTradingReward.getTradingRewardDataToProcessInterval();
Interval = await aggregateTradingReward.getTradingRewardDataToProcessInterval();
expect(interval).toEqual(Interval.fromDateTimes(endedAt2, endedAt2.plus({ minutes: 1 })));

const aggregations:
Expand Down Expand Up @@ -251,8 +289,11 @@ describe('aggregate-trading-rewards', () => {
TradingRewardAggregationPeriod.MONTHLY,
);
const interval:
Interval | undefined = await aggregateTradingReward.getTradingRewardDataToProcessInterval();
expect(interval).toEqual(Interval.fromDateTimes(endedAt2, endedAt2.plus({ hour: 1 })));
Interval = await aggregateTradingReward.getTradingRewardDataToProcessInterval();
expect(interval).toEqual(Interval.fromDateTimes(
endedAt2,
endedAt2.plus({ milliseconds: config.AGGREGATE_TRADING_REWARDS_MAX_INTERVAL_SIZE_MS }),
));

const aggregations:
TradingRewardAggregationFromDatabase[] = await TradingRewardAggregationTable.findAll(
Expand Down Expand Up @@ -284,7 +325,7 @@ describe('aggregate-trading-rewards', () => {
TradingRewardAggregationPeriod.MONTHLY,
);
const interval:
Interval | undefined = await aggregateTradingReward.getTradingRewardDataToProcessInterval();
Interval = await aggregateTradingReward.getTradingRewardDataToProcessInterval();
expect(interval).toEqual(Interval.fromDateTimes(
endedAt2.plus({ hours: 23, minutes: 55 }),
endedAt2.plus({ days: 1 })),
Expand All @@ -302,13 +343,8 @@ describe('aggregate-trading-rewards', () => {
describe('runTask', () => {
it('Successfully logs and exits if there are no blocks in the database', async () => {
await dbHelpers.clearData();
await generateTaskFromPeriod(TradingRewardAggregationPeriod.MONTHLY)();

expect(logger.info).toHaveBeenCalledWith(
expect.objectContaining({
message: 'No interval to aggregate trading rewards',
}),
);
await expect(generateTaskFromPeriod(TradingRewardAggregationPeriod.MONTHLY)())
.rejects.toEqual(new Error('Unable to find latest block'));
});
});
});
Expand Down
13 changes: 6 additions & 7 deletions indexer/services/roundtable/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,7 @@ export const configSchema = {
LOOPS_INTERVAL_MS_REMOVE_OLD_ORDER_UPDATES: parseInteger({
default: THIRTY_SECONDS_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_AGGREGATE_TRADING_REWARDS_DAILY: parseInteger({
default: THIRTY_SECONDS_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_AGGREGATE_TRADING_REWARDS_WEEKLY: parseInteger({
default: THIRTY_SECONDS_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_AGGREGATE_TRADING_REWARDS_MONTHLY: parseInteger({
LOOPS_INTERVAL_MS_AGGREGATE_TRADING_REWARDS: parseInteger({
default: THIRTY_SECONDS_IN_MILLISECONDS,
}),

Expand Down Expand Up @@ -149,6 +143,11 @@ export const configSchema = {

// Remove old cached order updates
OLD_CACHED_ORDER_UPDATES_WINDOW_MS: parseInteger({ default: 30 * ONE_SECOND_IN_MILLISECONDS }),

// Aggregate Trading Rewards
AGGREGATE_TRADING_REWARDS_MAX_INTERVAL_SIZE_MS: parseInteger({
default: ONE_HOUR_IN_MILLISECONDS,
}),
};

export default parseSchema(configSchema);
6 changes: 3 additions & 3 deletions indexer/services/roundtable/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,23 +145,23 @@ async function start(): Promise<void> {
startLoop(
aggregateTradingRewardsTasks(TradingRewardAggregationPeriod.DAILY),
'aggregate_trading_rewards_daily',
config.LOOPS_INTERVAL_MS_AGGREGATE_TRADING_REWARDS_DAILY,
config.LOOPS_INTERVAL_MS_AGGREGATE_TRADING_REWARDS,
);
}

if (config.LOOPS_ENABLED_AGGREGATE_TRADING_REWARDS_WEEKLY) {
startLoop(
aggregateTradingRewardsTasks(TradingRewardAggregationPeriod.WEEKLY),
'aggregate_trading_rewards_weekly',
config.LOOPS_INTERVAL_MS_AGGREGATE_TRADING_REWARDS_WEEKLY,
config.LOOPS_INTERVAL_MS_AGGREGATE_TRADING_REWARDS,
);
}

if (config.LOOPS_ENABLED_AGGREGATE_TRADING_REWARDS_MONTHLY) {
startLoop(
aggregateTradingRewardsTasks(TradingRewardAggregationPeriod.MONTHLY),
'aggregate_trading_rewards_monthly',
config.LOOPS_INTERVAL_MS_AGGREGATE_TRADING_REWARDS_MONTHLY,
config.LOOPS_INTERVAL_MS_AGGREGATE_TRADING_REWARDS,
);
}

Expand Down
Loading

0 comments on commit 522a238

Please sign in to comment.