From 8a0e698e1cd05f02ed84de2a496c2d3cd125e725 Mon Sep 17 00:00:00 2001 From: Roy Li Date: Wed, 25 Sep 2024 13:14:32 -0400 Subject: [PATCH] Create consumer using rack id --- .../indexer-build-and-push-testnet.yml | 1 + .../packages/kafka/__tests__/consumer.test.ts | 4 +- indexer/packages/kafka/src/consumer.ts | 75 ++++++++++--------- .../src/helpers/kafka/kafka-controller.ts | 4 +- indexer/services/scripts/src/print-block.ts | 6 +- .../src/helpers/kafka/kafka-controller.ts | 4 +- .../src/helpers/kafka/kafka-controller.ts | 4 +- 7 files changed, 53 insertions(+), 45 deletions(-) diff --git a/.github/workflows/indexer-build-and-push-testnet.yml b/.github/workflows/indexer-build-and-push-testnet.yml index 3cee1bf285..a2ebdbff62 100644 --- a/.github/workflows/indexer-build-and-push-testnet.yml +++ b/.github/workflows/indexer-build-and-push-testnet.yml @@ -3,6 +3,7 @@ name: Indexer Build & Push Images to AWS ECR for Testnet Branch on: # yamllint disable-line rule:truthy push: branches: + - roy/* - main - 'release/indexer/v[0-9]+.[0-9]+.x' # e.g. release/indexer/v0.1.x - 'release/indexer/v[0-9]+.x' # e.g. release/indexer/v1.x diff --git a/indexer/packages/kafka/__tests__/consumer.test.ts b/indexer/packages/kafka/__tests__/consumer.test.ts index de801b2dfe..e05d67d5e3 100644 --- a/indexer/packages/kafka/__tests__/consumer.test.ts +++ b/indexer/packages/kafka/__tests__/consumer.test.ts @@ -10,10 +10,10 @@ import { TO_ENDER_TOPIC } from '../src'; describe.skip('consumer', () => { beforeAll(async () => { await Promise.all([ - consumer.connect(), + consumer!.connect(), producer.connect(), ]); - await consumer.subscribe({ topic: TO_ENDER_TOPIC }); + await consumer!.subscribe({ topic: TO_ENDER_TOPIC }); await startConsumer(); }); diff --git a/indexer/packages/kafka/src/consumer.ts b/indexer/packages/kafka/src/consumer.ts index 93c41d12c5..f9058c64d4 100644 --- a/indexer/packages/kafka/src/consumer.ts +++ b/indexer/packages/kafka/src/consumer.ts @@ -1,4 +1,5 @@ import { + getAvailabilityZoneId, logger, } from '@dydxprotocol-indexer/base'; import { @@ -13,15 +14,10 @@ const groupIdPrefix: string = config.SERVICE_NAME; const groupIdSuffix: string = config.KAFKA_ENABLE_UNIQUE_CONSUMER_GROUP_IDS ? `_${uuidv4()}` : ''; const groupId: string = `${groupIdPrefix}${groupIdSuffix}`; -export const consumer: Consumer = kafka.consumer({ - groupId, - sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS, - rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS, - heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS, - maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS, - readUncommitted: false, - maxBytes: 4194304, // 4MB -}); +// As a hack, we made this mutable since CommonJS doesn't support top level await. +// Top level await would needed to fetch the az id (used as rack id). +// eslint-disable-next-line import/no-mutable-exports +export let consumer: Consumer | undefined; // List of functions to run per message consumed. let onMessageFunction: (topic: string, message: KafkaMessage) => Promise; @@ -51,29 +47,6 @@ export function updateOnBatchFunction( // Whether the consumer is stopped. let stopped: boolean = false; -consumer.on('consumer.disconnect', async () => { - logger.info({ - at: 'consumers#disconnect', - message: 'Kafka consumer disconnected', - groupId, - }); - - if (!stopped) { - await consumer.connect(); - logger.info({ - at: 'kafka-consumer#disconnect', - message: 'Kafka consumer reconnected', - groupId, - }); - } else { - logger.info({ - at: 'kafka-consumer#disconnect', - message: 'Not reconnecting since task is shutting down', - groupId, - }); - } -}); - export async function stopConsumer(): Promise { logger.info({ at: 'kafka-consumer#stop', @@ -82,10 +55,44 @@ export async function stopConsumer(): Promise { }); stopped = true; - await consumer.disconnect(); + await consumer!.disconnect(); } export async function startConsumer(batchProcessing: boolean = false): Promise { + consumer = kafka.consumer({ + groupId, + sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS, + rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS, + heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS, + maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS, + readUncommitted: false, + maxBytes: 4194304, // 4MB + rackId: await getAvailabilityZoneId(), + }); + + consumer!.on('consumer.disconnect', async () => { + logger.info({ + at: 'consumers#disconnect', + message: 'Kafka consumer disconnected', + groupId, + }); + + if (!stopped) { + await consumer!.connect(); + logger.info({ + at: 'kafka-consumer#disconnect', + message: 'Kafka consumer reconnected', + groupId, + }); + } else { + logger.info({ + at: 'kafka-consumer#disconnect', + message: 'Not reconnecting since task is shutting down', + groupId, + }); + } + }); + const consumerRunConfig: ConsumerRunConfig = { // The last offset of each batch will be committed if processing does not error. // The commit will still happen if the number of messages in the batch < autoCommitThreshold. @@ -104,7 +111,7 @@ export async function startConsumer(batchProcessing: boolean = false): Promise { await Promise.all([ - consumer.connect(), + consumer!.connect(), producer.connect(), ]); - await consumer.subscribe({ + await consumer!.subscribe({ topic: TO_ENDER_TOPIC, // https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning // Need to set fromBeginning to true, so when ender restarts, it will consume all messages diff --git a/indexer/services/scripts/src/print-block.ts b/indexer/services/scripts/src/print-block.ts index 05855229ca..34d32a7728 100644 --- a/indexer/services/scripts/src/print-block.ts +++ b/indexer/services/scripts/src/print-block.ts @@ -42,7 +42,7 @@ export function seek(offset: bigint): void { offset: offset.toString(), }); - consumer.seek({ + consumer!.seek({ topic: TO_ENDER_TOPIC, partition: 0, offset: offset.toString(), @@ -57,11 +57,11 @@ export function seek(offset: bigint): void { export async function connect(height: number): Promise { await Promise.all([ - consumer.connect(), + consumer!.connect(), producer.connect(), ]); - await consumer.subscribe({ + await consumer!.subscribe({ topic: TO_ENDER_TOPIC, fromBeginning: true, }); diff --git a/indexer/services/socks/src/helpers/kafka/kafka-controller.ts b/indexer/services/socks/src/helpers/kafka/kafka-controller.ts index 03409f2849..e1a1ca5e9a 100644 --- a/indexer/services/socks/src/helpers/kafka/kafka-controller.ts +++ b/indexer/services/socks/src/helpers/kafka/kafka-controller.ts @@ -6,14 +6,14 @@ import { } from '@dydxprotocol-indexer/kafka'; export async function connect(): Promise { - await consumer.connect(); + await consumer!.connect(); logger.info({ at: 'kafka-controller#connect', message: 'Connected to Kafka', }); - await consumer.subscribe({ topics: Object.values(WebsocketTopics) }); + await consumer!.subscribe({ topics: Object.values(WebsocketTopics) }); } export async function disconnect(): Promise { diff --git a/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts b/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts index ae2038f0f3..58c983d171 100644 --- a/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts +++ b/indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts @@ -10,11 +10,11 @@ import { onMessage } from '../../lib/on-message'; export async function connect(): Promise { await Promise.all([ - consumer.connect(), + consumer!.connect(), producer.connect(), ]); - await consumer.subscribe({ + await consumer!.subscribe({ topic: KafkaTopics.TO_VULCAN, // https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning // fromBeginning is by default set to false, so vulcan will only consume messages produced