Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create consumer using rack id (#2352) #2393

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions indexer/packages/kafka/__tests__/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import { TO_ENDER_TOPIC } from '../src';
describe.skip('consumer', () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Address skipped test suite

The entire test suite is currently skipped due to potential flakiness caused by timeouts. This is a significant issue as it means these tests are not being run, which could lead to undetected bugs.

Consider the following actions:

  1. Investigate the root cause of the flakiness and timeouts.
  2. Refactor the tests to be more resilient to timing issues, possibly by using mock timers or adjusting wait times.
  3. If the flakiness persists, consider splitting the suite into stable and unstable tests, running the stable ones regularly and the unstable ones in a separate, more tolerant CI job.

Please don't leave tests skipped indefinitely, as this defeats the purpose of having tests in the first place.

beforeAll(async () => {
await Promise.all([
consumer.connect(),
consumer!.connect(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Potential Risk with Non-Null Assertions on consumer

The consumer object is typed as Consumer | undefined and is being initialized properly. However, using non-null assertions (!) in the test can mask potential issues if consumer is undefined in certain scenarios.

  • Recommendations:
    • Implement explicit null checks before invoking methods on consumer.
    • Ensure the initialization process guarantees consumer is defined before usage.
    • Address the skipped tests to ensure they reliably validate consumer behavior.
🔗 Analysis chain

Verify the nullability of the consumer object

The addition of non-null assertions (!) to consumer.connect() and consumer.subscribe() calls suggests that the consumer object might now be nullable. This change could mask potential issues if consumer is actually null or undefined at runtime.

Please verify the following:

  1. Confirm that consumer is properly initialized before these method calls.
  2. Consider adding a null check before these calls instead of using non-null assertions.
  3. Update the test to reflect any changes in the consumer initialization process.

Run the following script to verify the consumer initialization:

Also applies to: 16-16

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check consumer initialization in the source file

# Test: Search for consumer initialization
rg --type typescript -A 5 'consumer\s*=|let\s+consumer' src/consumer.ts

# Test: Check if consumer is marked as nullable
rg --type typescript 'consumer\s*:\s*\w+\s*\|?\s*null' src/consumer.ts

Length of output: 216


Script:

#!/bin/bash
# Description: Check consumer initialization in the source file

# Test: Search for consumer initialization
rg -g "*.ts" -A 5 'consumer\s*=|let\s+consumer' src/consumer.ts

# Test: Check if consumer is marked as nullable
rg -g "*.ts" 'consumer\s*:\s*\w+\s*\|?\s*null' src/consumer.ts

Length of output: 242


Script:

#!/bin/bash
# Description: Locate all instances of consumer.ts in the repository

fd 'consumer\.ts$' --type f

Length of output: 68


Script:

#!/bin/bash
# Description: Check consumer initialization in the correct source file

# Test: Search for consumer initialization
rg -g "*.ts" -A 5 'consumer\s*=|let\s+consumer' indexer/packages/kafka/src/consumer.ts

# Test: Check if consumer is marked as nullable
rg -g "*.ts" 'consumer\s*:\s*\w+\s*\|?\s*null' indexer/packages/kafka/src/consumer.ts

Length of output: 668

producer.connect(),
]);
await consumer.subscribe({ topic: TO_ENDER_TOPIC });
await consumer!.subscribe({ topic: TO_ENDER_TOPIC });
await startConsumer();
});

Expand Down
75 changes: 42 additions & 33 deletions indexer/packages/kafka/src/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
getAvailabilityZoneId,
logger,
} from '@dydxprotocol-indexer/base';
import {
Expand All @@ -13,15 +14,10 @@ const groupIdPrefix: string = config.SERVICE_NAME;
const groupIdSuffix: string = config.KAFKA_ENABLE_UNIQUE_CONSUMER_GROUP_IDS ? `_${uuidv4()}` : '';
const groupId: string = `${groupIdPrefix}${groupIdSuffix}`;

export const consumer: Consumer = kafka.consumer({
groupId,
sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS,
rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS,
heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS,
maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS,
readUncommitted: false,
maxBytes: 4194304, // 4MB
});
// As a hack, we made this mutable since CommonJS doesn't support top level await.
// Top level await would needed to fetch the az id (used as rack id).
// eslint-disable-next-line import/no-mutable-exports
export let consumer: Consumer | undefined;
Comment on lines +17 to +20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider refactoring to avoid mutable exports

Mutating exported variables can lead to unexpected behaviors and is generally discouraged. To maintain code integrity and improve maintainability, consider refactoring the code to encapsulate the consumer within a function or a class rather than exporting a mutable variable.


// List of functions to run per message consumed.
let onMessageFunction: (topic: string, message: KafkaMessage) => Promise<void>;
Expand Down Expand Up @@ -51,38 +47,51 @@ export function updateOnBatchFunction(
// Whether the consumer is stopped.
let stopped: boolean = false;

consumer.on('consumer.disconnect', async () => {
export async function stopConsumer(): Promise<void> {
logger.info({
at: 'consumers#disconnect',
message: 'Kafka consumer disconnected',
at: 'kafka-consumer#stop',
message: 'Stopping kafka consumer',
groupId,
});

if (!stopped) {
await consumer.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
} else {
stopped = true;
await consumer!.disconnect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential undefined consumer in stopConsumer

If consumer is undefined when stopConsumer() is called, invoking await consumer!.disconnect(); will result in a runtime error. Consider adding a check to ensure consumer is initialized before attempting to disconnect.

Apply this diff to handle the potential undefined consumer:

stopped = true;
- await consumer!.disconnect();
+ if (consumer) {
+   await consumer.disconnect();
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await consumer!.disconnect();
stopped = true;
if (consumer) {
await consumer.disconnect();
}

}

export async function initConsumer(): Promise<void> {
consumer = kafka.consumer({
groupId,
sessionTimeout: config.KAFKA_SESSION_TIMEOUT_MS,
rebalanceTimeout: config.KAFKA_REBALANCE_TIMEOUT_MS,
heartbeatInterval: config.KAFKA_HEARTBEAT_INTERVAL_MS,
maxWaitTimeInMs: config.KAFKA_WAIT_MAX_TIME_MS,
readUncommitted: false,
maxBytes: 4194304, // 4MB
rackId: await getAvailabilityZoneId(),
});

consumer!.on('consumer.disconnect', async () => {
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Not reconnecting since task is shutting down',
at: 'consumers#disconnect',
message: 'Kafka consumer disconnected',
groupId,
});
}
});

export async function stopConsumer(): Promise<void> {
logger.info({
at: 'kafka-consumer#stop',
message: 'Stopping kafka consumer',
groupId,
if (!stopped) {
await consumer!.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
} else {
Comment on lines +80 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling during consumer reconnection

When reconnecting the consumer after a disconnect, errors might occur during await consumer!.connect();. To prevent unhandled exceptions, wrap the reconnection logic in a try-catch block and handle any potential errors.

Apply this diff to add error handling:

if (!stopped) {
+   try {
      await consumer!.connect();
      logger.info({
        at: 'kafka-consumer#disconnect',
        message: 'Kafka consumer reconnected',
        groupId,
      });
+   } catch (error) {
+     logger.error({
+       at: 'kafka-consumer#disconnect',
+       message: 'Error reconnecting Kafka consumer',
+       error,
+       groupId,
+     });
+   }
} else {
  logger.info({
    at: 'kafka-consumer#disconnect',
    message: 'Not reconnecting since task is shutting down',
    groupId,
  });
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (!stopped) {
await consumer!.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
} else {
if (!stopped) {
try {
await consumer!.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
} catch (error) {
logger.error({
at: 'kafka-consumer#disconnect',
message: 'Error reconnecting Kafka consumer',
error,
groupId,
});
}
} else {
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Not reconnecting since task is shutting down',
groupId,
});
}

logger.info({
at: 'kafka-consumer#disconnect',
message: 'Not reconnecting since task is shutting down',
groupId,
});
}
});

stopped = true;
await consumer.disconnect();
}

export async function startConsumer(batchProcessing: boolean = false): Promise<void> {
Expand All @@ -104,7 +113,7 @@ export async function startConsumer(batchProcessing: boolean = false): Promise<v
};
}

await consumer.run(consumerRunConfig);
await consumer!.run(consumerRunConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure consumer is initialized before running

The consumer variable may be undefined if initConsumer() has not been called before startConsumer(). To prevent runtime errors, consider adding a check to ensure consumer is initialized before calling consumer!.run(...), or refactor the code to guarantee the initialization sequence.

Apply this diff to check for consumer initialization:

+ if (!consumer) {
+   throw new Error('Consumer not initialized. Please call initConsumer() before startConsumer().');
+ }
  await consumer!.run(consumerRunConfig);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await consumer!.run(consumerRunConfig);
if (!consumer) {
throw new Error('Consumer not initialized. Please call initConsumer() before startConsumer().');
}
await consumer!.run(consumerRunConfig);


logger.info({
at: 'consumers#connect',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
consumer, producer, TO_ENDER_TOPIC, updateOnMessageFunction,
consumer, initConsumer, producer, TO_ENDER_TOPIC, updateOnMessageFunction,
} from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';

import { onMessage } from '../../lib/on-message';

export async function connect(): Promise<void> {
await Promise.all([
consumer.connect(),
initConsumer(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Approve initConsumer() usage, but consider error handling for consumer initialization.

  1. The use of initConsumer() is a good change that likely improves code organization and allows for more complex initialization if needed.

  2. However, the non-null assertion (!) on consumer assumes that initConsumer() will always successfully initialize the consumer. This could potentially lead to runtime errors if initialization fails for any reason.

Consider adding error handling to account for potential initialization failures:

await initConsumer();
if (!consumer) {
  throw new Error('Failed to initialize Kafka consumer');
}
await consumer.subscribe({
  // ... existing subscription options
});

This approach ensures that the code fails fast and explicitly if the consumer isn't properly initialized, rather than potentially causing issues later in the execution.

Also applies to: 15-15

producer.connect(),
]);

await consumer.subscribe({
await consumer!.subscribe({
topic: TO_ENDER_TOPIC,
// https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning
// Need to set fromBeginning to true, so when ender restarts, it will consume all messages
Expand Down
6 changes: 3 additions & 3 deletions indexer/services/scripts/src/print-block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export function seek(offset: bigint): void {
offset: offset.toString(),
});

consumer.seek({
consumer!.seek({
topic: TO_ENDER_TOPIC,
partition: 0,
offset: offset.toString(),
Expand All @@ -57,11 +57,11 @@ export function seek(offset: bigint): void {

export async function connect(height: number): Promise<void> {
await Promise.all([
consumer.connect(),
consumer!.connect(),
producer.connect(),
]);

await consumer.subscribe({
await consumer!.subscribe({
topic: TO_ENDER_TOPIC,
fromBeginning: true,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ import { logger } from '@dydxprotocol-indexer/base';
import {
WebsocketTopics,
consumer,
initConsumer,
stopConsumer,
} from '@dydxprotocol-indexer/kafka';

export async function connect(): Promise<void> {
await consumer.connect();
await initConsumer();

logger.info({
at: 'kafka-controller#connect',
message: 'Connected to Kafka',
});

await consumer.subscribe({ topics: Object.values(WebsocketTopics) });
await consumer!.subscribe({ topics: Object.values(WebsocketTopics) });
}

export async function disconnect(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
consumer, producer, KafkaTopics, updateOnMessageFunction, updateOnBatchFunction,
consumer, initConsumer, producer, KafkaTopics, updateOnMessageFunction, updateOnBatchFunction,
} from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';

Expand All @@ -10,11 +10,11 @@ import { onMessage } from '../../lib/on-message';

export async function connect(): Promise<void> {
await Promise.all([
consumer.connect(),
initConsumer(),
producer.connect(),
]);

await consumer.subscribe({
await consumer!.subscribe({
topic: KafkaTopics.TO_VULCAN,
// https://kafka.js.org/docs/consuming#a-name-from-beginning-a-frombeginning
// fromBeginning is by default set to false, so vulcan will only consume messages produced
Expand Down
Loading