Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create consumer using rack id (#2352) #2393

Merged
merged 1 commit into from
Sep 27, 2024
Merged

Create consumer using rack id (#2352) #2393

merged 1 commit into from
Sep 27, 2024

Conversation

roy-dydx
Copy link
Contributor

@roy-dydx roy-dydx commented Sep 27, 2024

Changelist

[Describe or list the changes made in this PR]

Test Plan

[Describe how this PR was tested (if applicable)]

Author/Reviewer Checklist

  • If this PR has changes that result in a different app state given the same prior state and transaction list, manually add the state-breaking label.
  • If the PR has breaking postgres changes to the indexer add the indexer-postgres-breaking label.
  • If this PR isn't state-breaking but has changes that modify behavior in PrepareProposal or ProcessProposal, manually add the label proposal-breaking.
  • If this PR is one of many that implement a specific feature, manually label them all feature:[feature-name].
  • If you wish to for mergify-bot to automatically create a PR to backport your change to a release branch, manually add the label backport/[branch-name].
  • Manually add any of the following labels: refactor, chore, bug.

Summary by CodeRabbit

  • New Features
    • Introduced a new initialization method for the Kafka consumer, enhancing its setup process.
  • Bug Fixes
    • Improved handling of the consumer's connection state to prevent potential null reference errors.
  • Refactor
    • Reorganized the Kafka connection logic for better clarity and efficiency across various components.

@roy-dydx roy-dydx requested a review from a team as a code owner September 27, 2024 18:45
Copy link
Contributor

coderabbitai bot commented Sep 27, 2024

Walkthrough

The changes involve significant updates to the Kafka consumer's initialization and connection management across multiple files. The consumer variable is now mutable, allowing for later initialization. The initConsumer function has been introduced to streamline the consumer setup, replacing direct calls to consumer.connect(). Additionally, non-null assertions have been added to ensure that the consumer is expected to be initialized before usage, enhancing the handling of its state throughout the application.

Changes

Files Change Summary
indexer/packages/kafka/__tests__/consumer.test.ts Added non-null assertion operator to consumer in test setup.
indexer/packages/kafka/src/consumer.ts Changed consumer from a constant to a mutable export; added initConsumer method; modified stopConsumer and startConsumer functions; updated event listener for consumer.disconnect.
indexer/services/ender/src/helpers/kafka/kafka-controller.ts Replaced direct consumer.connect() call with initConsumer(), updated subscription to use non-null assertion on consumer.
indexer/services/socks/src/helpers/kafka/kafka-controller.ts Replaced consumer.connect() with initConsumer(); updated subscription to use non-null assertion on consumer.
indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts Replaced consumer.connect() with initConsumer(); updated subscription to use non-null assertion on consumer.
indexer/services/scripts/src/print-block.ts Added non-null assertion operator to consumer in seek and connect functions.

Sequence Diagram(s)

sequenceDiagram
    participant App
    participant KafkaController
    participant Consumer

    App->>KafkaController: call connect()
    KafkaController->>Consumer: call initConsumer()
    Consumer-->>KafkaController: consumer initialized
    KafkaController->>Consumer: call subscribe()
    Consumer-->>KafkaController: subscription successful
Loading

🐰 In the meadow where bunnies hop,
A Kafka change makes our hearts stop!
With consumers now set to connect,
We dance in joy, what a perfect effect!
Non-null we shout, with glee we cheer,
For smooth connections, let’s all draw near!
🌼✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Outside diff range and nitpick comments (5)
indexer/services/socks/src/helpers/kafka/kafka-controller.ts (1)

Line range hint 1-28: Overall: Good refactoring, but consider addressing non-null assertion.

The changes in this file are part of a larger refactoring effort to improve Kafka consumer management. The introduction of initConsumer and the changes to the connect function improve modularity and separation of concerns.

However, the use of the non-null assertion in the consumer.subscribe call introduces a potential risk of runtime errors. Consider addressing this as suggested in the previous comment.

As a follow-up action, it would be beneficial to review the initConsumer implementation and the overall consumer lifecycle management to ensure that the consumer is always properly initialized before use.

indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts (3)

13-14: LGTM: Consumer initialization refactored.

The replacement of consumer.connect() with initConsumer() is a good refactoring choice. It provides a more modular approach to initializing the Kafka consumer, which can improve code organization and allow for more complex initialization logic if needed in the future.

Consider adding a comment explaining the purpose of initConsumer() for better code documentation. For example:

// Initialize the Kafka consumer with default configuration
await initConsumer(),

Line range hint 17-25: Consider a safer approach to handle potentially undefined consumer.

The addition of the non-null assertion operator (!) to consumer.subscribe() suggests that consumer might be undefined at this point. While this allows the code to compile, it doesn't provide runtime safety.

Consider implementing a safer approach to handle the potentially undefined consumer. Here are two suggestions:

  1. Add a runtime check:
if (!consumer) {
  throw new Error('Consumer is not initialized');
}
await consumer.subscribe({
  // ... existing options
});
  1. Use optional chaining with a fallback:
await consumer?.subscribe({
  // ... existing options
}) ?? Promise.reject(new Error('Consumer is not initialized'));

Both approaches provide better runtime safety and clearer error messages if the consumer is not properly initialized.


Line range hint 1-50: Summary of changes and recommendations

The changes in this file improve the overall structure of the Kafka consumer initialization and connection management. The introduction of initConsumer() provides a more modular approach to setting up the consumer. However, there are a few points to consider:

  1. The non-null assertion on consumer.subscribe() (line 17) introduces a potential runtime issue. Consider implementing a safer approach to handle the case where the consumer might be undefined, as suggested in the previous comment.

  2. Adding a brief comment explaining the purpose of initConsumer() would improve code documentation.

  3. The rest of the file remains unchanged and appears to be functioning as expected.

Overall, these changes are a step in the right direction for better code organization, but addressing the potential runtime issue with the consumer subscription is crucial for improving the robustness of the code.

indexer/services/scripts/src/print-block.ts (1)

Consider a safer approach to handle potential null consumer

The use of the non-null assertion operator (!) on consumer assumes that it will always be initialized when seek is called. This could lead to runtime errors if consumer is actually null or undefined.

Consider using a null check or optional chaining for safer operation:

if (consumer) {
  consumer.seek({
    topic: TO_ENDER_TOPIC,
    partition: 0,
    offset: offset.toString(),
  });
} else {
  logger.error({
    at: 'consumer#seek',
    message: 'Cannot seek: consumer is not initialized',
  });
}

This approach prevents potential runtime errors and provides better error handling.

🔗 Analysis chain

Line range hint 45-49: Consider a safer approach to handle potential null consumer

The use of the non-null assertion operator (!) on consumer assumes that it will always be initialized when seek is called. This could lead to runtime errors if consumer is actually null or undefined.

Consider using a null check or optional chaining for safer operation:

if (consumer) {
  consumer.seek({
    topic: TO_ENDER_TOPIC,
    partition: 0,
    offset: offset.toString(),
  });
} else {
  logger.error({
    at: 'consumer#seek',
    message: 'Cannot seek: consumer is not initialized',
  });
}

This approach prevents potential runtime errors and provides better error handling.

To ensure consumer is always initialized before seek is called, we can check its usage:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for direct calls to the seek function
rg --type typescript "seek\(" -C 5

Length of output: 155


Script:

#!/bin/bash
# Search for direct calls to the seek function in TypeScript files
rg "seek\(" -g "*.ts" -C 5

Length of output: 2177

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 5533a8f and 5c03c55.

📒 Files selected for processing (6)
  • indexer/packages/kafka/tests/consumer.test.ts (1 hunks)
  • indexer/packages/kafka/src/consumer.ts (4 hunks)
  • indexer/services/ender/src/helpers/kafka/kafka-controller.ts (1 hunks)
  • indexer/services/scripts/src/print-block.ts (2 hunks)
  • indexer/services/socks/src/helpers/kafka/kafka-controller.ts (1 hunks)
  • indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts (2 hunks)
🔇 Additional comments (8)
indexer/services/socks/src/helpers/kafka/kafka-controller.ts (3)

5-5: LGTM: Import changes align with new consumer initialization approach.

The addition of initConsumer import is consistent with the changes in the connect function. This refactoring likely improves the separation of concerns in the Kafka consumer initialization process.


10-10: LGTM: Consumer initialization refactored.

The replacement of consumer.connect() with initConsumer() is a good refactoring step. It likely encapsulates the connection logic, potentially making it easier to manage and test.


17-17: ⚠️ Potential issue

Consider safer alternatives to non-null assertion.

The addition of the non-null assertion (!) to consumer suggests that it might be nullable. While this allows the code to compile, it could lead to runtime errors if consumer is actually null when this line is executed.

Consider the following alternatives:

  1. Use optional chaining: await consumer?.subscribe(...). This would safely handle the case where consumer is null or undefined.

  2. Add a null check before the subscribe call:

    if (consumer) {
      await consumer.subscribe({ topics: Object.values(WebsocketTopics) });
    } else {
      throw new Error('Consumer not initialized');
    }
  3. If consumer is guaranteed to be initialized by this point, consider updating its type to be non-nullable, removing the need for the assertion.

To better understand the consumer initialization, let's check its usage:

✅ Verification successful

Non-null assertion is safe, but consider alternative approaches for better safety.

The initConsumer function properly initializes consumer, ensuring it is not null when subscribe is called. However, for enhanced safety and maintainability, you might consider the following alternatives:

  1. Optional Chaining:

    await consumer?.subscribe({ topics: Object.values(WebsocketTopics) });

    This safely handles cases where consumer might be null or undefined.

  2. Explicit Null Check:

    if (consumer) {
      await consumer.subscribe({ topics: Object.values(WebsocketTopics) });
    } else {
      throw new Error('Consumer not initialized');
    }
  3. Update Type to Non-Nullable:
    If consumer is guaranteed to be initialized before use, update its type to be non-nullable, eliminating the need for the assertion.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for consumer initialization and usage
rg --type typescript 'consumer\s*=' -A 5
rg --type typescript 'initConsumer' -A 5

Length of output: 153


Script:

#!/bin/bash
# Search for consumer initialization and usage in TypeScript files
rg 'consumer\s*=' --glob '*.ts' --glob '*.tsx' -A 5
rg 'initConsumer' --glob '*.ts' --glob '*.tsx' -A 5

Length of output: 4190


Script:

#!/bin/bash
# Extract the implementation of initConsumer function in TypeScript files
find . -type f \( -name "*.ts" -o -name "*.tsx" \) -exec grep -A 10 'function initConsumer' {} \; 

Length of output: 529

indexer/services/ender/src/helpers/kafka/kafka-controller.ts (1)

3-3: LGTM: Import statement updated correctly.

The import statement has been appropriately updated to include initConsumer, which is now used in the connect function.

indexer/services/vulcan/src/helpers/kafka/kafka-controller.ts (1)

3-4: LGTM: Import statement updated correctly.

The import statement has been properly updated to include initConsumer, which is consistent with the changes in the connect function. This change reflects the refactoring of the Kafka consumer initialization into a separate function.

indexer/services/scripts/src/print-block.ts (2)

60-66: ⚠️ Potential issue

Implement safer null checks in asynchronous connect function

The use of non-null assertion operators (!) on consumer in the connect function assumes that it will always be initialized. This could lead to runtime errors if consumer is null or undefined.

Consider implementing null checks and proper error handling:

export async function connect(height: number): Promise<void> {
  if (!consumer) {
    throw new Error('Consumer is not initialized');
  }

  try {
    await Promise.all([
      consumer.connect(),
      producer.connect(),
    ]);

    await consumer.subscribe({
      topic: TO_ENDER_TOPIC,
      fromBeginning: true,
    });

    updateOnMessageFunction((_topic: string, message: KafkaMessage): Promise<void> => {
      return printMessageAtHeight(message, height);
    });

    logger.info({
      at: 'consumers#connect',
      message: 'Connected to Kafka',
    });
  } catch (error) {
    logger.error({
      at: 'consumers#connect',
      message: 'Failed to connect to Kafka',
      error,
    });
    throw error;
  }
}

This approach provides better error handling and prevents potential runtime errors.

To ensure consumer is always initialized before connect is called, we can check its initialization and usage:

#!/bin/bash
# Search for consumer initialization
rg --type typescript "consumer\s*=\s*" -C 5

# Search for calls to the connect function
rg --type typescript "connect\(" -C 5

Line range hint 1-180: Request for context: Changes to consumer initialization and management

The introduction of non-null assertions for the consumer variable in the seek and connect functions suggests that there might have been changes to how consumer is initialized or managed elsewhere in the codebase.

To better understand the context of these changes and ensure they're part of a coherent strategy, please provide more information about:

  1. Changes to consumer initialization
  2. Any modifications to its lifecycle management
  3. The rationale behind making it nullable or lazily initialized (if applicable)

Additionally, I recommend a comprehensive review of consumer usage across the codebase to ensure consistency and prevent potential runtime errors. Here's a script to help with this review:

This will help identify any inconsistencies or potential issues in how consumer is used throughout the project.

indexer/packages/kafka/src/consumer.ts (1)

2-2: Verify the availability zone ID retrieval

Ensure that getAvailabilityZoneId() returns the correct availability zone ID and handles any potential errors during the asynchronous operation. Consider adding error handling or validation if necessary.

Run the following script to verify the getAvailabilityZoneId function implementation and usage:

} from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';

import { onMessage } from '../../lib/on-message';

export async function connect(): Promise<void> {
await Promise.all([
consumer.connect(),
initConsumer(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Approve initConsumer() usage, but consider error handling for consumer initialization.

  1. The use of initConsumer() is a good change that likely improves code organization and allows for more complex initialization if needed.

  2. However, the non-null assertion (!) on consumer assumes that initConsumer() will always successfully initialize the consumer. This could potentially lead to runtime errors if initialization fails for any reason.

Consider adding error handling to account for potential initialization failures:

await initConsumer();
if (!consumer) {
  throw new Error('Failed to initialize Kafka consumer');
}
await consumer.subscribe({
  // ... existing subscription options
});

This approach ensures that the code fails fast and explicitly if the consumer isn't properly initialized, rather than potentially causing issues later in the execution.

Also applies to: 15-15

@@ -10,10 +10,10 @@ import { TO_ENDER_TOPIC } from '../src';
describe.skip('consumer', () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Address skipped test suite

The entire test suite is currently skipped due to potential flakiness caused by timeouts. This is a significant issue as it means these tests are not being run, which could lead to undetected bugs.

Consider the following actions:

  1. Investigate the root cause of the flakiness and timeouts.
  2. Refactor the tests to be more resilient to timing issues, possibly by using mock timers or adjusting wait times.
  3. If the flakiness persists, consider splitting the suite into stable and unstable tests, running the stable ones regularly and the unstable ones in a separate, more tolerant CI job.

Please don't leave tests skipped indefinitely, as this defeats the purpose of having tests in the first place.

@@ -10,10 +10,10 @@ import { TO_ENDER_TOPIC } from '../src';
describe.skip('consumer', () => {
beforeAll(async () => {
await Promise.all([
consumer.connect(),
consumer!.connect(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Potential Risk with Non-Null Assertions on consumer

The consumer object is typed as Consumer | undefined and is being initialized properly. However, using non-null assertions (!) in the test can mask potential issues if consumer is undefined in certain scenarios.

  • Recommendations:
    • Implement explicit null checks before invoking methods on consumer.
    • Ensure the initialization process guarantees consumer is defined before usage.
    • Address the skipped tests to ensure they reliably validate consumer behavior.
🔗 Analysis chain

Verify the nullability of the consumer object

The addition of non-null assertions (!) to consumer.connect() and consumer.subscribe() calls suggests that the consumer object might now be nullable. This change could mask potential issues if consumer is actually null or undefined at runtime.

Please verify the following:

  1. Confirm that consumer is properly initialized before these method calls.
  2. Consider adding a null check before these calls instead of using non-null assertions.
  3. Update the test to reflect any changes in the consumer initialization process.

Run the following script to verify the consumer initialization:

Also applies to: 16-16

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check consumer initialization in the source file

# Test: Search for consumer initialization
rg --type typescript -A 5 'consumer\s*=|let\s+consumer' src/consumer.ts

# Test: Check if consumer is marked as nullable
rg --type typescript 'consumer\s*:\s*\w+\s*\|?\s*null' src/consumer.ts

Length of output: 216


Script:

#!/bin/bash
# Description: Check consumer initialization in the source file

# Test: Search for consumer initialization
rg -g "*.ts" -A 5 'consumer\s*=|let\s+consumer' src/consumer.ts

# Test: Check if consumer is marked as nullable
rg -g "*.ts" 'consumer\s*:\s*\w+\s*\|?\s*null' src/consumer.ts

Length of output: 242


Script:

#!/bin/bash
# Description: Locate all instances of consumer.ts in the repository

fd 'consumer\.ts$' --type f

Length of output: 68


Script:

#!/bin/bash
# Description: Check consumer initialization in the correct source file

# Test: Search for consumer initialization
rg -g "*.ts" -A 5 'consumer\s*=|let\s+consumer' indexer/packages/kafka/src/consumer.ts

# Test: Check if consumer is marked as nullable
rg -g "*.ts" 'consumer\s*:\s*\w+\s*\|?\s*null' indexer/packages/kafka/src/consumer.ts

Length of output: 668

Comment on lines +17 to +20
// As a hack, we made this mutable since CommonJS doesn't support top level await.
// Top level await would needed to fetch the az id (used as rack id).
// eslint-disable-next-line import/no-mutable-exports
export let consumer: Consumer | undefined;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider refactoring to avoid mutable exports

Mutating exported variables can lead to unexpected behaviors and is generally discouraged. To maintain code integrity and improve maintainability, consider refactoring the code to encapsulate the consumer within a function or a class rather than exporting a mutable variable.

});
} else {
stopped = true;
await consumer!.disconnect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential undefined consumer in stopConsumer

If consumer is undefined when stopConsumer() is called, invoking await consumer!.disconnect(); will result in a runtime error. Consider adding a check to ensure consumer is initialized before attempting to disconnect.

Apply this diff to handle the potential undefined consumer:

stopped = true;
- await consumer!.disconnect();
+ if (consumer) {
+   await consumer.disconnect();
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await consumer!.disconnect();
stopped = true;
if (consumer) {
await consumer.disconnect();
}

@@ -104,7 +113,7 @@ export async function startConsumer(batchProcessing: boolean = false): Promise<v
};
}

await consumer.run(consumerRunConfig);
await consumer!.run(consumerRunConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure consumer is initialized before running

The consumer variable may be undefined if initConsumer() has not been called before startConsumer(). To prevent runtime errors, consider adding a check to ensure consumer is initialized before calling consumer!.run(...), or refactor the code to guarantee the initialization sequence.

Apply this diff to check for consumer initialization:

+ if (!consumer) {
+   throw new Error('Consumer not initialized. Please call initConsumer() before startConsumer().');
+ }
  await consumer!.run(consumerRunConfig);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await consumer!.run(consumerRunConfig);
if (!consumer) {
throw new Error('Consumer not initialized. Please call initConsumer() before startConsumer().');
}
await consumer!.run(consumerRunConfig);

Comment on lines +80 to +87
if (!stopped) {
await consumer!.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling during consumer reconnection

When reconnecting the consumer after a disconnect, errors might occur during await consumer!.connect();. To prevent unhandled exceptions, wrap the reconnection logic in a try-catch block and handle any potential errors.

Apply this diff to add error handling:

if (!stopped) {
+   try {
      await consumer!.connect();
      logger.info({
        at: 'kafka-consumer#disconnect',
        message: 'Kafka consumer reconnected',
        groupId,
      });
+   } catch (error) {
+     logger.error({
+       at: 'kafka-consumer#disconnect',
+       message: 'Error reconnecting Kafka consumer',
+       error,
+       groupId,
+     });
+   }
} else {
  logger.info({
    at: 'kafka-consumer#disconnect',
    message: 'Not reconnecting since task is shutting down',
    groupId,
  });
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (!stopped) {
await consumer!.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
} else {
if (!stopped) {
try {
await consumer!.connect();
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Kafka consumer reconnected',
groupId,
});
} catch (error) {
logger.error({
at: 'kafka-consumer#disconnect',
message: 'Error reconnecting Kafka consumer',
error,
groupId,
});
}
} else {
logger.info({
at: 'kafka-consumer#disconnect',
message: 'Not reconnecting since task is shutting down',
groupId,
});
}

@roy-dydx roy-dydx merged commit 0a3b3b8 into main Sep 27, 2024
16 checks passed
@roy-dydx roy-dydx deleted the roy/r2 branch September 27, 2024 19:18
@roy-dydx
Copy link
Contributor Author

@Mergifyio release/indexer/v5.x

@roy-dydx
Copy link
Contributor Author

https://github.com/Mergifyio release/indexer/v7.x

Copy link
Contributor

mergify bot commented Sep 30, 2024

release /indexer/v5.x

❌ Sorry but I didn't understand the command. Please consult the commands documentation 📚.

Copy link
Contributor

mergify bot commented Sep 30, 2024

release /indexer/v7.x

❌ Sorry but I didn't understand the command. Please consult the commands documentation 📚.

@roy-dydx
Copy link
Contributor Author

https://github.com/Mergifyio backport release/indexer/v7.x

Copy link
Contributor

mergify bot commented Sep 30, 2024

backport release/indexer/v7.x

✅ Backports have been created

mergify bot pushed a commit that referenced this pull request Sep 30, 2024
roy-dydx added a commit that referenced this pull request Sep 30, 2024
roy-dydx added a commit that referenced this pull request Sep 30, 2024
@roy-dydx
Copy link
Contributor Author

https://github.com/Mergifyio backport release/indexer/v5.x

Copy link
Contributor

mergify bot commented Sep 30, 2024

backport release/indexer/v5.x

✅ Backports have been created

mergify bot pushed a commit that referenced this pull request Sep 30, 2024
roy-dydx added a commit that referenced this pull request Oct 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

Successfully merging this pull request may close these issues.

2 participants