diff --git a/indexer/services/vulcan/src/handlers/order-place-handler.ts b/indexer/services/vulcan/src/handlers/order-place-handler.ts index 1f1b343c39..ead0281445 100644 --- a/indexer/services/vulcan/src/handlers/order-place-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-place-handler.ts @@ -1,4 +1,6 @@ -import { logger, runFuncWithTimingStat, stats } from '@dydxprotocol-indexer/base'; +import { + logger, getInstanceId, runFuncWithTimingStat, stats, +} from '@dydxprotocol-indexer/base'; import { createSubaccountWebsocketMessage, KafkaTopics } from '@dydxprotocol-indexer/kafka'; import { blockHeightRefresher, @@ -88,7 +90,11 @@ export class OrderPlaceHandler extends Handler { }); if (placeOrderResult.replaced) { - stats.increment(`${config.SERVICE_NAME}.place_order_handler.replaced_order`, 1); + stats.increment( + `${config.SERVICE_NAME}.place_order_handler.replaced_order`, + 1, + { instance: getInstanceId() }, + ); } // TODO(CLOB-597): Remove this logic and log erorrs once best-effort-open is not sent for diff --git a/indexer/services/vulcan/src/handlers/order-remove-handler.ts b/indexer/services/vulcan/src/handlers/order-remove-handler.ts index 60d110b806..03d4d182bf 100644 --- a/indexer/services/vulcan/src/handlers/order-remove-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-remove-handler.ts @@ -1,4 +1,6 @@ -import { logger, runFuncWithTimingStat, stats } from '@dydxprotocol-indexer/base'; +import { + logger, getInstanceId, runFuncWithTimingStat, stats, +} from '@dydxprotocol-indexer/base'; import { KafkaTopics, SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, getTriggerPrice } from '@dydxprotocol-indexer/kafka'; import { blockHeightRefresher, @@ -89,7 +91,11 @@ export class OrderRemoveHandler extends Handler { reason === OrderRemovalReason.ORDER_REMOVAL_REASON_INDEXER_EXPIRED && !(await this.isOrderExpired(orderRemove)) ) { - stats.increment(`${config.SERVICE_NAME}.order_remove_reason_indexer_temp_expired`, 1); + stats.increment( + `${config.SERVICE_NAME}.order_remove_reason_indexer_temp_expired`, + 1, + { instance: getInstanceId() }, + ); logger.info({ at: 'OrderRemoveHandler#handle', message: 'Order was expired by Indexer but is no longer expired. Ignoring.', @@ -115,7 +121,11 @@ export class OrderRemoveHandler extends Handler { if ( orderRemove.reason === OrderRemovalReason.ORDER_REMOVAL_REASON_INDEXER_EXPIRED ) { - stats.increment(`${config.SERVICE_NAME}.order_remove_reason_indexer_expired`, 1); + stats.increment( + `${config.SERVICE_NAME}.order_remove_reason_indexer_expired`, + 1, + { instance: getInstanceId() }, + ); logger.info({ at: 'OrderRemoveHandler#handle', message: 'Order was expired by Indexer', @@ -439,7 +449,11 @@ export class OrderRemoveHandler extends Handler { this.generateTimingStatsOptions('find_order_for_indexer_expired_expiry_verification'), ); if (redisOrder === null) { - stats.increment(`${config.SERVICE_NAME}.indexer_expired_order_not_found`, 1); + stats.increment( + `${config.SERVICE_NAME}.indexer_expired_order_not_found`, + 1, + { instance: getInstanceId() }, + ); logger.info({ at: 'orderRemoveHandler#isOrderExpired', message: 'Could not find order for Indexer-expired expiry verification', @@ -463,7 +477,11 @@ export class OrderRemoveHandler extends Handler { // We know the order is short-term, so the goodTilBlock must exist. if (order.goodTilBlock! >= +block.blockHeight) { - stats.increment(`${config.SERVICE_NAME}.indexer_expired_order_is_not_expired`, 1); + stats.increment( + `${config.SERVICE_NAME}.indexer_expired_order_is_not_expired`, + 1, + { instance: getInstanceId() }, + ); logger.info({ at: 'orderRemoveHandler#isOrderExpired', message: 'Indexer marked order that is not yet expired as expired', diff --git a/indexer/services/vulcan/src/handlers/order-update-handler.ts b/indexer/services/vulcan/src/handlers/order-update-handler.ts index e105ceea45..bdc9978b64 100644 --- a/indexer/services/vulcan/src/handlers/order-update-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-update-handler.ts @@ -1,5 +1,6 @@ import { logger, + getInstanceId, runFuncWithTimingStat, stats, } from '@dydxprotocol-indexer/base'; @@ -102,6 +103,7 @@ export class OrderUpdateHandler extends Handler { 1, { orderFlags: String(orderFlags), + instance: getInstanceId(), }, ); return; @@ -110,7 +112,11 @@ export class OrderUpdateHandler extends Handler { const sizeDeltaInQuantums: Big = this.getSizeDeltaInQuantums(updateResult, orderUpdate); if (sizeDeltaInQuantums.eq(0)) { - stats.increment(`${config.SERVICE_NAME}.order_update_with_zero_delta.count`, 1); + stats.increment( + `${config.SERVICE_NAME}.order_update_with_zero_delta.count`, + 1, + { instance: getInstanceId() }, + ); return; } @@ -192,7 +198,11 @@ export class OrderUpdateHandler extends Handler { message: 'Old total filled quantums of order exceeds order size in quantums.', updateResult, }); - stats.increment(`${config.SERVICE_NAME}.order_update_old_total_filled_exceeds_size`, 1); + stats.increment( + `${config.SERVICE_NAME}.order_update_old_total_filled_exceeds_size`, + 1, + { instance: getInstanceId() }, + ); return Big(updateResult.order!.order!.quantums.toNumber().toString()); } @@ -219,7 +229,11 @@ export class OrderUpdateHandler extends Handler { orderUpdate, updateResult, }); - stats.increment(`${config.SERVICE_NAME}.order_update_total_filled_exceeds_size`, 1); + stats.increment( + `${config.SERVICE_NAME}.order_update_total_filled_exceeds_size`, + 1, + { instance: getInstanceId() }, + ); return Big(updateResult.order!.order!.quantums.toNumber().toString()); } diff --git a/indexer/services/vulcan/src/index.ts b/indexer/services/vulcan/src/index.ts index f70f8e59ce..08ca315431 100644 --- a/indexer/services/vulcan/src/index.ts +++ b/indexer/services/vulcan/src/index.ts @@ -1,4 +1,6 @@ -import { logger, startBugsnag, wrapBackgroundTask } from '@dydxprotocol-indexer/base'; +import { + logger, getInstanceId, startBugsnag, setInstanceId, wrapBackgroundTask, +} from '@dydxprotocol-indexer/base'; import { stopConsumer, startConsumer } from '@dydxprotocol-indexer/kafka'; import { blockHeightRefresher, perpetualMarketRefresher } from '@dydxprotocol-indexer/postgres'; @@ -18,6 +20,18 @@ async function startService(): Promise { startBugsnag(); + logger.info({ + at: 'index#start', + message: 'Getting instance id...', + }); + + await setInstanceId(); + + logger.info({ + at: 'index#start', + message: `Got instance id ${getInstanceId()}.`, + }); + // Initialize PerpetualMarkets cache await Promise.all([ blockHeightRefresher.updateBlockHeight(), diff --git a/indexer/services/vulcan/src/lib/on-batch.ts b/indexer/services/vulcan/src/lib/on-batch.ts index a2eccef231..352f1c9b98 100644 --- a/indexer/services/vulcan/src/lib/on-batch.ts +++ b/indexer/services/vulcan/src/lib/on-batch.ts @@ -1,4 +1,4 @@ -import { logger, stats } from '@dydxprotocol-indexer/base'; +import { getInstanceId, logger, stats } from '@dydxprotocol-indexer/base'; import { Batch, EachBatchPayload, @@ -13,7 +13,7 @@ export async function onBatch( const batch: Batch = payload.batch; const topic: string = batch.topic; const partition: string = batch.partition.toString(); - const metricTags: Record = { topic, partition }; + const metricTags: Record = { topic, partition, instance: getInstanceId() }; if (batch.isEmpty()) { logger.error({ at: 'on-batch#onBatch', diff --git a/indexer/services/vulcan/src/lib/on-message.ts b/indexer/services/vulcan/src/lib/on-message.ts index cc81a9a326..86766e549f 100644 --- a/indexer/services/vulcan/src/lib/on-message.ts +++ b/indexer/services/vulcan/src/lib/on-message.ts @@ -1,4 +1,5 @@ import { + getInstanceId, logger, stats, ParseMessageError, @@ -42,9 +43,17 @@ function getMessageType(update: OffChainUpdateV1): string { } export async function onMessage(message: KafkaMessage): Promise { - stats.increment(`${config.SERVICE_NAME}.received_kafka_message`, 1); + stats.increment( + `${config.SERVICE_NAME}.received_kafka_message`, + 1, + { instance: getInstanceId() }, + ); if (!message || !message.value || !message.timestamp) { - stats.increment(`${config.SERVICE_NAME}.empty_kafka_message`, 1); + stats.increment( + `${config.SERVICE_NAME}.empty_kafka_message`, + 1, + { instance: getInstanceId() }, + ); logger.error({ at: 'onMessage#onMessage', message: 'Empty message', @@ -59,6 +68,7 @@ export async function onMessage(message: KafkaMessage): Promise { STATS_NO_SAMPLING, { topic: KafkaTopics.TO_VULCAN, + instance: getInstanceId(), }, ); @@ -71,6 +81,7 @@ export async function onMessage(message: KafkaMessage): Promise { { topic: KafkaTopics.TO_VULCAN, event_type: String(message.headers?.event_type), + instance: getInstanceId(), }, ); } @@ -130,6 +141,7 @@ export async function onMessage(message: KafkaMessage): Promise { { topic: KafkaTopics.TO_VULCAN, event_type: String(headers?.event_type), + instance: getInstanceId(), }, ); } @@ -164,6 +176,7 @@ export async function onMessage(message: KafkaMessage): Promise { { success: success.toString(), messageType: getMessageType(update), + instance: getInstanceId(), }, ); } diff --git a/indexer/services/vulcan/src/lib/send-message-helper.ts b/indexer/services/vulcan/src/lib/send-message-helper.ts index cf0d05307c..08f5e1096a 100644 --- a/indexer/services/vulcan/src/lib/send-message-helper.ts +++ b/indexer/services/vulcan/src/lib/send-message-helper.ts @@ -1,5 +1,5 @@ import { - logger, stats, STATS_NO_SAMPLING, wrapBackgroundTask, + getInstanceId, logger, stats, STATS_NO_SAMPLING, wrapBackgroundTask, } from '@dydxprotocol-indexer/base'; import { producer } from '@dydxprotocol-indexer/kafka'; import { Message } from 'kafkajs'; @@ -77,7 +77,7 @@ async function sendMessages(topic: string): Promise { const messages: Message[] = queuedMessages[topic]; if (messages === undefined || messages.length === 0) { - stats.histogram(sizeStat, 0, STATS_NO_SAMPLING, { topic, success: 'true' }); + stats.histogram(sizeStat, 0, STATS_NO_SAMPLING, { topic, success: 'true', instance: getInstanceId() }); return; } queuedMessages[topic] = []; @@ -107,6 +107,7 @@ async function sendMessages(topic: string): Promise { const tags: {[name: string]: string} = { topic, success: success.toString(), + instance: getInstanceId(), }; stats.histogram(sizeStat, messages.length, STATS_NO_SAMPLING, tags); stats.timing(timingStat, Date.now() - start, STATS_NO_SAMPLING, tags);