diff --git a/indexer/services/ender/__tests__/handlers/funding-handler.test.ts b/indexer/services/ender/__tests__/handlers/funding-handler.test.ts index 669810deb9..afe0df78fa 100644 --- a/indexer/services/ender/__tests__/handlers/funding-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/funding-handler.test.ts @@ -233,6 +233,14 @@ describe('fundingHandler', () => { })); expect(stats.gauge).toHaveBeenCalledWith('ender.funding_index_update_event', 0.1, { ticker: 'BTC-USD' }); expect(stats.gauge).toHaveBeenCalledWith('ender.funding_index_update', 0.1, { ticker: 'BTC-USD' }); + expect(stats.timing).toHaveBeenCalledWith( + 'ender.handle_funding_event.sql_latency', + expect.any(Number), + { + className: 'FundingHandler', + eventType: 'FundingEvent', + }, + ); }); it('successfully processes and clears cache for a new funding rate', async () => { diff --git a/indexer/services/ender/src/handlers/asset-handler.ts b/indexer/services/ender/src/handlers/asset-handler.ts index b505f6af5c..34cd516050 100644 --- a/indexer/services/ender/src/handlers/asset-handler.ts +++ b/indexer/services/ender/src/handlers/asset-handler.ts @@ -1,3 +1,4 @@ +import { stats } from '@dydxprotocol-indexer/base'; import { AssetFromDatabase, AssetModel, @@ -6,6 +7,7 @@ import { import { AssetCreateEventV1 } from '@dydxprotocol-indexer/v4-protos'; import * as pg from 'pg'; +import config from '../config'; import { ConsolidatedKafkaEvent } from '../lib/types'; import { Handler } from './handler'; @@ -18,6 +20,12 @@ export class AssetCreationHandler extends Handler { // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(resultRow: pg.QueryResultRow): Promise { + // Handle latency from resultRow + stats.timing( + `${config.SERVICE_NAME}.handle_asset_event.sql_latency`, + Number(resultRow.latency), + this.generateTimingStatsOptions(), + ); const asset: AssetFromDatabase = AssetModel.fromJson( resultRow.asset) as AssetFromDatabase; assetRefresher.addAsset(asset); diff --git a/indexer/services/ender/src/handlers/funding-handler.ts b/indexer/services/ender/src/handlers/funding-handler.ts index b05af85806..5485dfe84c 100644 --- a/indexer/services/ender/src/handlers/funding-handler.ts +++ b/indexer/services/ender/src/handlers/funding-handler.ts @@ -141,6 +141,13 @@ export class FundingHandler extends Handler { }); stats.increment(`${config.SERVICE_NAME}.handle_funding_event.failure`, 1); } + + // Handle latency from resultRow + stats.timing( + `${config.SERVICE_NAME}.handle_funding_event.sql_latency`, + Number(resultRow.latency), + this.generateTimingStatsOptions(), + ); } await Promise.all(promises); diff --git a/indexer/services/ender/src/handlers/liquidity-tier-handler.ts b/indexer/services/ender/src/handlers/liquidity-tier-handler.ts index 92e1f7a901..b36df1812a 100644 --- a/indexer/services/ender/src/handlers/liquidity-tier-handler.ts +++ b/indexer/services/ender/src/handlers/liquidity-tier-handler.ts @@ -1,3 +1,4 @@ +import { stats } from '@dydxprotocol-indexer/base'; import { LiquidityTiersFromDatabase, LiquidityTiersModel, @@ -9,6 +10,7 @@ import { LiquidityTierUpsertEventV1, LiquidityTierUpsertEventV2 } from '@dydxpro import _ from 'lodash'; import * as pg from 'pg'; +import config from '../config'; import { generatePerpetualMarketMessage } from '../helpers/kafka-helper'; import { ConsolidatedKafkaEvent } from '../lib/types'; import { Handler } from './handler'; @@ -21,6 +23,12 @@ export class LiquidityTierHandlerBase extends Handler { // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(resultRow: pg.QueryResultRow): Promise { + // Handle latency from resultRow + stats.timing( + `${config.SERVICE_NAME}.handle_liquidity_tier_event.sql_latency`, + Number(resultRow.latency), + this.generateTimingStatsOptions(), + ); const liquidityTier: LiquidityTiersFromDatabase = LiquidityTiersModel.fromJson( resultRow.liquidity_tier, ) as LiquidityTiersFromDatabase; diff --git a/indexer/services/ender/src/handlers/markets/market-create-handler.ts b/indexer/services/ender/src/handlers/markets/market-create-handler.ts index 6f539e3665..aa8bc571b3 100644 --- a/indexer/services/ender/src/handlers/markets/market-create-handler.ts +++ b/indexer/services/ender/src/handlers/markets/market-create-handler.ts @@ -1,7 +1,8 @@ -import { logger } from '@dydxprotocol-indexer/base'; +import { logger, stats } from '@dydxprotocol-indexer/base'; import { MarketEventV1 } from '@dydxprotocol-indexer/v4-protos'; import * as pg from 'pg'; +import config from '../../config'; import { ConsolidatedKafkaEvent } from '../../lib/types'; import { Handler } from '../handler'; @@ -14,12 +15,18 @@ export class MarketCreateHandler extends Handler { } // eslint-disable-next-line @typescript-eslint/require-await - public async internalHandle(_: pg.QueryResultRow): Promise { + public async internalHandle(resultRow: pg.QueryResultRow): Promise { logger.info({ at: 'MarketCreateHandler#handle', message: 'Received MarketEvent with MarketCreate.', event: this.event, }); + // Handle latency from resultRow + stats.timing( + `${config.SERVICE_NAME}.handle_market_create_event.sql_latency`, + Number(resultRow.latency), + this.generateTimingStatsOptions(), + ); return []; } diff --git a/indexer/services/ender/src/handlers/markets/market-modify-handler.ts b/indexer/services/ender/src/handlers/markets/market-modify-handler.ts index eeee9188e3..88f416ef17 100644 --- a/indexer/services/ender/src/handlers/markets/market-modify-handler.ts +++ b/indexer/services/ender/src/handlers/markets/market-modify-handler.ts @@ -1,7 +1,8 @@ -import { logger } from '@dydxprotocol-indexer/base'; +import { logger, stats } from '@dydxprotocol-indexer/base'; import { MarketEventV1 } from '@dydxprotocol-indexer/v4-protos'; import * as pg from 'pg'; +import config from '../../config'; import { ConsolidatedKafkaEvent } from '../../lib/types'; import { Handler } from '../handler'; @@ -14,13 +15,20 @@ export class MarketModifyHandler extends Handler { } // eslint-disable-next-line @typescript-eslint/require-await - public async internalHandle(_: pg.QueryResultRow): Promise { + public async internalHandle(resultRow: pg.QueryResultRow): Promise { logger.info({ at: 'MarketModifyHandler#handle', message: 'Received MarketEvent with MarketModify.', event: this.event, }); + // Handle latency from resultRow + stats.timing( + `${config.SERVICE_NAME}.handle_market_modify_event.sql_latency`, + Number(resultRow.latency), + this.generateTimingStatsOptions(), + ); + return []; } } diff --git a/indexer/services/ender/src/handlers/markets/market-price-update-handler.ts b/indexer/services/ender/src/handlers/markets/market-price-update-handler.ts index 56c9978fd9..d11dd89ade 100644 --- a/indexer/services/ender/src/handlers/markets/market-price-update-handler.ts +++ b/indexer/services/ender/src/handlers/markets/market-price-update-handler.ts @@ -1,4 +1,4 @@ -import { logger } from '@dydxprotocol-indexer/base'; +import { logger, stats } from '@dydxprotocol-indexer/base'; import { MarketFromDatabase, OraclePriceFromDatabase, @@ -8,6 +8,7 @@ import { import { MarketEventV1 } from '@dydxprotocol-indexer/v4-protos'; import * as pg from 'pg'; +import config from '../../config'; import { generateOraclePriceContents } from '../../helpers/kafka-helper'; import { ConsolidatedKafkaEvent, @@ -35,6 +36,13 @@ export class MarketPriceUpdateHandler extends Handler { const oraclePrice: OraclePriceFromDatabase = OraclePriceModel.fromJson( resultRow.oracle_price) as OraclePriceFromDatabase; + // Handle latency from resultRow + stats.timing( + `${config.SERVICE_NAME}.handle_market_price_update_event.sql_latency`, + Number(resultRow.latency), + this.generateTimingStatsOptions(), + ); + return [ this.generateKafkaEvent( oraclePrice, market.pair, diff --git a/indexer/services/ender/src/handlers/order-fills/deleveraging-handler.ts b/indexer/services/ender/src/handlers/order-fills/deleveraging-handler.ts index ea8687f5d0..ce352487db 100644 --- a/indexer/services/ender/src/handlers/order-fills/deleveraging-handler.ts +++ b/indexer/services/ender/src/handlers/order-fills/deleveraging-handler.ts @@ -1,4 +1,4 @@ -import { logger } from '@dydxprotocol-indexer/base'; +import { logger, stats } from '@dydxprotocol-indexer/base'; import { FillFromDatabase, FillModel, @@ -15,6 +15,7 @@ import { import { DeleveragingEventV1 } from '@dydxprotocol-indexer/v4-protos'; import * as pg from 'pg'; +import config from '../../config'; import { SUBACCOUNT_ORDER_FILL_EVENT_TYPE } from '../../constants'; import { annotateWithPnl, convertPerpetualPosition } from '../../helpers/kafka-helper'; import { ConsolidatedKafkaEvent } from '../../lib/types'; @@ -95,6 +96,12 @@ export class DeleveragingHandler extends AbstractOrderFillHandler { + public async internalHandle(resultRow: pg.QueryResultRow): Promise { const orderIdProto: IndexerOrderId = this.event.orderRemoval!.removedOrderId!; + // Handle latency from resultRow + stats.timing( + `${config.SERVICE_NAME}.handle_stateful_order_removal_event.sql_latency`, + Number(resultRow.latency), + this.generateTimingStatsOptions(), + ); return this.createKafkaEvents(orderIdProto); } diff --git a/indexer/services/ender/src/handlers/subaccount-update-handler.ts b/indexer/services/ender/src/handlers/subaccount-update-handler.ts index 157cbcdfed..af3f60b9eb 100644 --- a/indexer/services/ender/src/handlers/subaccount-update-handler.ts +++ b/indexer/services/ender/src/handlers/subaccount-update-handler.ts @@ -1,3 +1,4 @@ +import { stats } from '@dydxprotocol-indexer/base'; import { AssetPositionFromDatabase, AssetPositionModel, @@ -16,6 +17,7 @@ import { import _ from 'lodash'; import * as pg from 'pg'; +import config from '../config'; import { SUBACCOUNT_ORDER_FILL_EVENT_TYPE } from '../constants'; import { addPositionsToContents, annotateWithPnl } from '../helpers/kafka-helper'; import { SubaccountUpdate } from '../lib/translated-types'; @@ -61,6 +63,12 @@ export class SubaccountUpdateHandler extends Handler { marketIdToMarket[marketId], ); } + // Handle latency from resultRow + stats.timing( + `${config.SERVICE_NAME}.handle_subaccount_update_event.sql_latency`, + Number(resultRow.latency), + this.generateTimingStatsOptions(), + ); return [ this.generateConsolidatedKafkaEvent( diff --git a/indexer/services/ender/src/handlers/trading-rewards-handler.ts b/indexer/services/ender/src/handlers/trading-rewards-handler.ts index 40cb63b474..85951a349e 100644 --- a/indexer/services/ender/src/handlers/trading-rewards-handler.ts +++ b/indexer/services/ender/src/handlers/trading-rewards-handler.ts @@ -1,3 +1,4 @@ +import { stats } from '@dydxprotocol-indexer/base'; import { SubaccountMessageContents, TradingRewardFromDatabase, @@ -8,6 +9,7 @@ import { TradingRewardsEventV1 } from '@dydxprotocol-indexer/v4-protos'; import _ from 'lodash'; import * as pg from 'pg'; +import config from '../config'; import { ConsolidatedKafkaEvent } from '../lib/types'; import { Handler } from './handler'; @@ -21,6 +23,12 @@ export class TradingRewardsHandler extends Handler { // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(resultRow: pg.QueryResultRow): Promise { + // Handle latency from resultRow + stats.timing( + `${config.SERVICE_NAME}.handle_trading_rewards_event.sql_latency`, + Number(resultRow.latency), + this.generateTimingStatsOptions(), + ); const tradingRewards: TradingRewardFromDatabase[] = _.map( resultRow.trading_rewards, (tradingReward: object) => { diff --git a/indexer/services/ender/src/handlers/transfer-handler.ts b/indexer/services/ender/src/handlers/transfer-handler.ts index ff95eff157..ae15612534 100644 --- a/indexer/services/ender/src/handlers/transfer-handler.ts +++ b/indexer/services/ender/src/handlers/transfer-handler.ts @@ -1,3 +1,4 @@ +import { stats } from '@dydxprotocol-indexer/base'; import { AssetFromDatabase, AssetModel, @@ -8,6 +9,7 @@ import { import { TransferEventV1 } from '@dydxprotocol-indexer/v4-protos'; import * as pg from 'pg'; +import config from '../config'; import { generateTransferContents } from '../helpers/kafka-helper'; import { ConsolidatedKafkaEvent } from '../lib/types'; import { Handler } from './handler'; @@ -22,6 +24,12 @@ export class TransferHandler extends Handler { // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(resultRow: pg.QueryResultRow): Promise { + // Handle latency from resultRow + stats.timing( + `${config.SERVICE_NAME}.handle_transfer_event.sql_latency`, + Number(resultRow.latency), + this.generateTimingStatsOptions(), + ); const asset: AssetFromDatabase = AssetModel.fromJson( resultRow.asset) as AssetFromDatabase; const transfer: TransferFromDatabase = TransferModel.fromJson( diff --git a/indexer/services/ender/src/handlers/update-clob-pair-handler.ts b/indexer/services/ender/src/handlers/update-clob-pair-handler.ts index 0840835227..9875e2d2f2 100644 --- a/indexer/services/ender/src/handlers/update-clob-pair-handler.ts +++ b/indexer/services/ender/src/handlers/update-clob-pair-handler.ts @@ -1,3 +1,4 @@ +import { stats } from '@dydxprotocol-indexer/base'; import { PerpetualMarketFromDatabase, PerpetualMarketModel, @@ -6,6 +7,7 @@ import { import { UpdateClobPairEventV1 } from '@dydxprotocol-indexer/v4-protos'; import * as pg from 'pg'; +import config from '../config'; import { generatePerpetualMarketMessage } from '../helpers/kafka-helper'; import { ConsolidatedKafkaEvent } from '../lib/types'; import { Handler } from './handler'; @@ -19,6 +21,12 @@ export class UpdateClobPairHandler extends Handler { // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(resultRow: pg.QueryResultRow): Promise { + // Handle latency from resultRow + stats.timing( + `${config.SERVICE_NAME}.handle_clob_pair_update_event.sql_latency`, + Number(resultRow.latency), + this.generateTimingStatsOptions(), + ); const perpetualMarket: PerpetualMarketFromDatabase = PerpetualMarketModel.fromJson( resultRow.perpetual_market) as PerpetualMarketFromDatabase; diff --git a/indexer/services/ender/src/handlers/update-perpetual-handler.ts b/indexer/services/ender/src/handlers/update-perpetual-handler.ts index c3a9175b21..d17d48e130 100644 --- a/indexer/services/ender/src/handlers/update-perpetual-handler.ts +++ b/indexer/services/ender/src/handlers/update-perpetual-handler.ts @@ -1,3 +1,4 @@ +import { stats } from '@dydxprotocol-indexer/base'; import { PerpetualMarketFromDatabase, perpetualMarketRefresher, @@ -6,6 +7,7 @@ import { import { UpdatePerpetualEventV1 } from '@dydxprotocol-indexer/v4-protos'; import * as pg from 'pg'; +import config from '../config'; import { generatePerpetualMarketMessage } from '../helpers/kafka-helper'; import { ConsolidatedKafkaEvent } from '../lib/types'; import { Handler } from './handler'; @@ -23,6 +25,12 @@ export class UpdatePerpetualHandler extends Handler { resultRow.perpetual_market) as PerpetualMarketFromDatabase; await perpetualMarketRefresher.upsertPerpetualMarket(perpetualMarket); + // Handle latency from resultRow + stats.timing( + `${config.SERVICE_NAME}.handle_update_perpetual_event.sql_latency`, + Number(resultRow.latency), + this.generateTimingStatsOptions(), + ); return [ this.generateConsolidatedMarketKafkaEvent( diff --git a/indexer/services/ender/src/scripts/handlers/dydx_block_processor_ordered_handlers.sql b/indexer/services/ender/src/scripts/handlers/dydx_block_processor_ordered_handlers.sql index 31da72d0ba..4e2deaa086 100644 --- a/indexer/services/ender/src/scripts/handlers/dydx_block_processor_ordered_handlers.sql +++ b/indexer/services/ender/src/scripts/handlers/dydx_block_processor_ordered_handlers.sql @@ -23,11 +23,16 @@ DECLARE event_index int; transaction_index int; event_data jsonb; + -- Latency tracking variables + event_start_time timestamp; + event_end_time timestamp; + event_latency interval; BEGIN rval = array_fill(NULL::jsonb, ARRAY[coalesce(jsonb_array_length(block->'events'), 0)]::integer[]); /** Note that arrays are 1-indexed in PostgreSQL and empty arrays return NULL for array_length. */ FOR i in 1..coalesce(array_length(rval, 1), 0) LOOP + event_start_time := clock_timestamp(); event_ = jsonb_array_element(block->'events', i-1); transaction_index = dydx_tendermint_event_to_transaction_index(event_); event_index = (event_->'eventIndex')::int; @@ -65,6 +70,16 @@ BEGIN ELSE NULL; END CASE; + + event_end_time := clock_timestamp(); + event_latency := event_end_time - event_start_time; + + -- Add the event latency in ms to the rval output for this event + rval[i] := jsonb_set( + rval[i], + '{latency}', + to_jsonb(EXTRACT(EPOCH FROM event_latency) * 1000) + ); END LOOP; RETURN rval; diff --git a/indexer/services/ender/src/scripts/handlers/dydx_block_processor_unordered_handlers.sql b/indexer/services/ender/src/scripts/handlers/dydx_block_processor_unordered_handlers.sql index 72153a8af6..7bcce414b2 100644 --- a/indexer/services/ender/src/scripts/handlers/dydx_block_processor_unordered_handlers.sql +++ b/indexer/services/ender/src/scripts/handlers/dydx_block_processor_unordered_handlers.sql @@ -25,11 +25,16 @@ DECLARE event_index int; transaction_index int; event_data jsonb; + -- Latency tracking variables + event_start_time timestamp; + event_end_time timestamp; + event_latency interval; BEGIN rval = array_fill(NULL::jsonb, ARRAY[coalesce(jsonb_array_length(block->'events'), 0)]::integer[]); /** Note that arrays are 1-indexed in PostgreSQL and empty arrays return NULL for array_length. */ FOR i in 1..coalesce(array_length(rval, 1), 0) LOOP + event_start_time := clock_timestamp(); event_ = jsonb_array_element(block->'events', i-1); transaction_index = dydx_tendermint_event_to_transaction_index(event_); event_index = (event_->'eventIndex')::int; @@ -63,6 +68,16 @@ BEGIN ELSE NULL; END CASE; + + event_end_time := clock_timestamp(); + event_latency := event_end_time - event_start_time; + + -- Add the event latency in ms to the rval output for this event + rval[i] := jsonb_set( + rval[i], + '{latency}', + to_jsonb(EXTRACT(EPOCH FROM event_latency) * 1000) + ); END LOOP; RETURN rval;