Skip to content

Commit

Permalink
Create consumer using rack id
Browse files Browse the repository at this point in the history
  • Loading branch information
roy-dydx committed Sep 25, 2024
1 parent c2a7745 commit 8a0e698
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 45 deletions.
1 change: 1 addition & 0 deletions .github/workflows/indexer-build-and-push-testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name: Indexer Build & Push Images to AWS ECR for Testnet Branch
on: # yamllint disable-line rule:truthy
push:
branches:
- roy/*
- main
- 'release/indexer/v[0-9]+.[0-9]+.x' # e.g. release/indexer/v0.1.x
- 'release/indexer/v[0-9]+.x' # e.g. release/indexer/v1.x
Expand Down
4 changes: 2 additions & 2 deletions indexer/packages/kafka/__tests__/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import { TO_ENDER_TOPIC } from '../src';
describe.skip('consumer', () => {
beforeAll(async () => {
await Promise.all([
consumer.connect(),
consumer!.connect(),
producer.connect(),
]);
await consumer.subscribe({ topic: TO_ENDER_TOPIC });
await consumer!.subscribe({ topic: TO_ENDER_TOPIC });
await startConsumer();
});

Expand Down
75 changes: 41 additions & 34 deletions indexer/packages/kafka/src/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
getAvailabilityZoneId,
logger,
} from '@dydxprotocol-indexer/base';
import {
Expand All @@ -13,15 +14,10 @@ const groupIdPrefix: string = config.SERVICE_NAME;
const groupIdSuffix: string = config.KAFKA_ENABLE_UNIQUE_CONSUMER_GROUP_IDS ? `_${uuidv4()}` : '';
const groupId: string = `${groupIdPrefix}${groupIdSuffix}`;

export const consumer: Consumer = kafka.consumer({
groupId,
sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS,
rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS,
heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS,
maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS,
readUncommitted: false,
maxBytes: 4194304, // 4MB
});
// As a hack, we made this mutable since CommonJS doesn't support top level await.
// Top level await would needed to fetch the az id (used as rack id).
// eslint-disable-next-line import/no-mutable-exports
export let consumer: Consumer | undefined;

// List of functions to run per message consumed.
let onMessageFunction: (topic: string, message: KafkaMessage) => Promise<void>;
Expand Down Expand Up @@ -51,29 +47,6 @@ export function updateOnBatchFunction(
// Whether the consumer is stopped.
let stopped: boolean = false;

consumer.on('consumer.disconnect', async () => {
logger.info({
at: 'consumers#disconnect',
message: 'Kafka consumer disconnected',
groupId,
});

if (!stopped) {
await consumer.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
} else {
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Not reconnecting since task is shutting down',
groupId,
});
}
});

export async function stopConsumer(): Promise<void> {
logger.info({
at: 'kafka-consumer#stop',
Expand All @@ -82,10 +55,44 @@ export async function stopConsumer(): Promise<void> {
});

stopped = true;
await consumer.disconnect();
await consumer!.disconnect();
}

export async function startConsumer(batchProcessing: boolean = false): Promise<void> {
consumer = kafka.consumer({
groupId,
sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS,
rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS,
heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS,
maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS,
readUncommitted: false,
maxBytes: 4194304, // 4MB
rackId: await getAvailabilityZoneId(),
});

consumer!.on('consumer.disconnect', async () => {
logger.info({
at: 'consumers#disconnect',
message: 'Kafka consumer disconnected',
groupId,
});

if (!stopped) {
await consumer!.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
} else {
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Not reconnecting since task is shutting down',
groupId,
});
}
});

const consumerRunConfig: ConsumerRunConfig = {
// The last offset of each batch will be committed if processing does not error.
// The commit will still happen if the number of messages in the batch < autoCommitThreshold.
Expand All @@ -104,7 +111,7 @@ export async function startConsumer(batchProcessing: boolean = false): Promise<v
};
}

await consumer.run(consumerRunConfig);
await consumer!.run(consumerRunConfig);

logger.info({
at: 'consumers#connect',
Expand Down
4 changes: 2 additions & 2 deletions indexer/services/ender/src/helpers/kafka/kafka-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import { onMessage } from '../../lib/on-message';

export async function connect(): Promise<void> {
await Promise.all([
consumer.connect(),
consumer!.connect(),
producer.connect(),
]);

await consumer.subscribe({
await consumer!.subscribe({
topic: TO_ENDER_TOPIC,
// https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning
// Need to set fromBeginning to true, so when ender restarts, it will consume all messages
Expand Down
6 changes: 3 additions & 3 deletions indexer/services/scripts/src/print-block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export function seek(offset: bigint): void {
offset: offset.toString(),
});

consumer.seek({
consumer!.seek({
topic: TO_ENDER_TOPIC,
partition: 0,
offset: offset.toString(),
Expand All @@ -57,11 +57,11 @@ export function seek(offset: bigint): void {

export async function connect(height: number): Promise<void> {
await Promise.all([
consumer.connect(),
consumer!.connect(),
producer.connect(),
]);

await consumer.subscribe({
await consumer!.subscribe({
topic: TO_ENDER_TOPIC,
fromBeginning: true,
});
Expand Down
4 changes: 2 additions & 2 deletions indexer/services/socks/src/helpers/kafka/kafka-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import {
} from '@dydxprotocol-indexer/kafka';

export async function connect(): Promise<void> {
await consumer.connect();
await consumer!.connect();

logger.info({
at: 'kafka-controller#connect',
message: 'Connected to Kafka',
});

await consumer.subscribe({ topics: Object.values(WebsocketTopics) });
await consumer!.subscribe({ topics: Object.values(WebsocketTopics) });
}

export async function disconnect(): Promise<void> {
Expand Down
4 changes: 2 additions & 2 deletions indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import { onMessage } from '../../lib/on-message';

export async function connect(): Promise<void> {
await Promise.all([
consumer.connect(),
consumer!.connect(),
producer.connect(),
]);

await consumer.subscribe({
await consumer!.subscribe({
topic: KafkaTopics.TO_VULCAN,
// https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning
// fromBeginning is by default set to false, so vulcan will only consume messages produced
Expand Down

0 comments on commit 8a0e698

Please sign in to comment.