Skip to content

Commit

Permalink
nits
Browse files Browse the repository at this point in the history
  • Loading branch information
Christopher-Li committed Jan 10, 2024
1 parent af9205c commit 3496b34
Showing 1 changed file with 57 additions and 46 deletions.
103 changes: 57 additions & 46 deletions indexer/services/roundtable/src/tasks/aggregate-trading-rewards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
ONE_MINUTE_IN_MILLISECONDS,
floorDate,
logger,
runFuncWithTimingStat,
} from '@dydxprotocol-indexer/base';
import {
BlockFromDatabase,
Expand Down Expand Up @@ -42,9 +43,9 @@ interface AggregationUpdateAndCreateObjects {
}

enum DateTimeUnit {
day = 'day',
week = 'week',
month = 'month',
DAY = 'day',
WEEK = 'week',
MONTH = 'month',
}

export default function generateTaskFromPeriod(
Expand Down Expand Up @@ -74,14 +75,15 @@ export class AggregateTradingReward {
});

const intervalTradingRewardsByAddress:
IntervalTradingRewardsByAddress = await this.getIntervalTradingRewardsByAddress(
interval,
IntervalTradingRewardsByAddress = await runFuncWithTimingStat(
this.getIntervalTradingRewardsByAddress(
interval,
),
this.generateTimingStatsOptions('getIntervalTradingRewardsByAddress'),
);
await this.updateTradingRewardsAggregation(interval, intervalTradingRewardsByAddress);
await AggregateTradingRewardsProcessedCache.setProcessedTime(
this.period,
await this.setProcessedTime(
interval.end.toISO(),
redisClient,
);
}

Expand All @@ -104,7 +106,10 @@ export class AggregateTradingReward {

// endedAt is only set when the entire interval has been processed for an aggregation
if (latestAggregation !== undefined && latestAggregation.endedAt === null) {
await this.deleteIncompleteAggregatedTradingReward(latestAggregation);
await runFuncWithTimingStat(
this.deleteIncompleteAggregatedTradingReward(latestAggregation),
this.generateTimingStatsOptions('deleteIncompleteAggregatedTradingReward'),
);
}
}

Expand Down Expand Up @@ -154,10 +159,8 @@ export class AggregateTradingReward {
message: 'Resetting AggregateTradingRewardsProcessedCache',
});
const nextStartTime: DateTime = await this.getNextIntervalStartWhenCacheEmpty();
await AggregateTradingRewardsProcessedCache.setProcessedTime(
this.period,
await this.setProcessedTime(
nextStartTime.toISO(),
redisClient,
);

return this.generateInterval(nextStartTime, latestBlock);
Expand Down Expand Up @@ -278,7 +281,7 @@ export class AggregateTradingReward {
const txId: number = await Transaction.start();
await Transaction.setIsolationLevel(txId, IsolationLevel.READ_UNCOMMITTED);
try {
await this.setAggregationUpdateAndCreateObjects(aggregationUpdateAndCreateObjects);
await this.createAndUpdateAggregations(aggregationUpdateAndCreateObjects, txId);
await Transaction.commit(txId);
logger.info({
at: 'aggregate-trading-rewards#updateTradingRewardsAggregation',
Expand Down Expand Up @@ -309,11 +312,14 @@ export class AggregateTradingReward {
): Promise<AggregationUpdateAndCreateObjects> {
const tradingRewardAddresses: string[] = Object.keys(intervalTradingRewardsByAddress);

const existingAggregateTradingRewards: TradingRewardAggregationFromDatabase[] = await
TradingRewardAggregationTable.findAll({
addresses: tradingRewardAddresses,
period: this.period,
}, []);
const existingAggregateTradingRewards:
TradingRewardAggregationFromDatabase[] = await runFuncWithTimingStat(
TradingRewardAggregationTable.findAll({
addresses: tradingRewardAddresses,
period: this.period,
}, []),
this.generateTimingStatsOptions('findAllExistingAggregations'),
);
const existingAggregateTradingRewardsMap:
{ [address: string]: TradingRewardAggregationFromDatabase } = _.keyBy(
existingAggregateTradingRewards,
Expand Down Expand Up @@ -359,7 +365,7 @@ export class AggregateTradingReward {
private async getNextBlock(time: IsoString): Promise<string> {
const blocks: BlockFromDatabase[] = await BlockTable.findAll({
createdOnOrAfter: time,
limit: 3,
limit: 1,
}, []);

if (blocks.length === 0) {
Expand All @@ -383,7 +389,7 @@ export class AggregateTradingReward {
): Promise<AggregationUpdateAndCreateObjects> {
const startedAt: string = this.getStartedAt(interval);
const startedAtHeight: string = await this.getNextBlock(startedAt);
const endedAt: IsoString = this.getEndedAt(interval);
const endedAt: IsoString = interval.end.toISO();
// endedAtHeight is the first block created before endedAt
const endedAtHeight: string = Big(await this.getNextBlock(endedAt)).minus(1).toFixed();

Expand Down Expand Up @@ -438,31 +444,22 @@ export class AggregateTradingReward {
};
}

private getEndedAt(interval: Interval): IsoString {
const startedAt: IsoString = this.getStartedAt(interval);

return DateTime
.fromISO(startedAt, UTC_OPTIONS)
.endOf(this.getDateTimeUnit())
.plus({ milliseconds: 1 })
.toISO();
}

private getDateTimeUnit(): DateTimeUnit {
switch (this.period) {
case TradingRewardAggregationPeriod.DAILY:
return DateTimeUnit.day;
return DateTimeUnit.DAY;
case TradingRewardAggregationPeriod.WEEKLY:
return DateTimeUnit.week;
return DateTimeUnit.WEEK;
case TradingRewardAggregationPeriod.MONTHLY:
return DateTimeUnit.week;
return DateTimeUnit.WEEK;
default:
throw new Error(`Invalid period ${this.period}`);
}
}

private async setAggregationUpdateAndCreateObjects(
private async createAndUpdateAggregations(
aggregationUpdateAndCreateObjects: AggregationUpdateAndCreateObjects,
txId: number,
): Promise<void> {
const createObjectsChunks: TradingRewardAggregationCreateObject[][] = _.chunk(
aggregationUpdateAndCreateObjects.createObjects,
Expand All @@ -475,12 +472,15 @@ export class AggregateTradingReward {
count: createObjectsChunk.length,
createObjectsChunk: JSON.stringify(createObjectsChunk),
});
await Promise.all(_.map(
createObjectsChunk,
(createObject: TradingRewardAggregationCreateObject) => {
return TradingRewardAggregationTable.create(createObject);
},
));
await runFuncWithTimingStat(
Promise.all(_.map(
createObjectsChunk,
(createObject: TradingRewardAggregationCreateObject) => {
return TradingRewardAggregationTable.create(createObject, { txId });
},
)),
this.generateTimingStatsOptions('createChunk'),
);
logger.info({
at: 'aggregate-trading-rewards#setAggregationUpdateAndCreateObjects',
message: 'Created trading reward aggregations',
Expand All @@ -499,12 +499,15 @@ export class AggregateTradingReward {
count: updateObjectsChunk.length,
updateObjectsChunk: JSON.stringify(updateObjectsChunk),
});
await Promise.all(_.map(
updateObjectsChunk,
(updateObject: TradingRewardAggregationUpdateObject) => {
return TradingRewardAggregationTable.update(updateObject);
},
));
await runFuncWithTimingStat(
Promise.all(_.map(
updateObjectsChunk,
(updateObject: TradingRewardAggregationUpdateObject) => {
return TradingRewardAggregationTable.update(updateObject, { txId });
},
)),
this.generateTimingStatsOptions('updateChunk'),
);
logger.info({
at: 'aggregate-trading-rewards#setAggregationUpdateAndCreateObjects',
message: 'Updated trading reward aggregations',
Expand All @@ -530,4 +533,12 @@ export class AggregateTradingReward {
processedTime,
});
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
protected generateTimingStatsOptions(fnName: string): any {
return {
taskName: 'aggregate-trading-rewards',
fnName,
};
}
}

0 comments on commit 3496b34

Please sign in to comment.