Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix consumer synchronization. Fix consumer to use user-specified groupId #3100

Merged
merged 3 commits into from
Aug 2, 2023

Conversation

kkondaka
Copy link
Collaborator

@kkondaka kkondaka commented Aug 1, 2023

Description

This PR contains the following fixes

  1. Addresses comments from PR Fix Negative acknowledgement handling and other minor issues #3082
    -- Fixes consumer synchronization issue by avoid synchronization
    -- Fixes LOG messages to use {}
  2. MSK getBrokers API exception handles KafkaException
  3. Fixes consumer to use user-configured groupID

Issues Resolved

[List any issues this PR will resolve]

Check List

  • [X ] New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • [ X] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Krishna Kondaka added 2 commits August 1, 2023 23:57
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. I do have some comments, but nothing to block it.

@@ -176,12 +170,27 @@ public <T> void consumeRecords() throws Exception {
}
}
} catch (AuthenticationException e) {
LOG.warn("Authentication Error while doing poll(). Will retry after 10 seconds", e);
LOG.warn("Access Denied while doing poll(). Will retry after 10 seconds", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Apache documentation indicates that this is an authentication error and not necessarily related to access controls.

consumeRecords();
commitOffsets();
} catch (Exception exp) {
LOG.error("Error while reading the records from the topic...", exp);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should avoid ellipsis in our logs. It's not a big deal, but it seems either indicate that we trailed of or have more to say (not the exception).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove ellipsis in the next PR.

@@ -95,6 +99,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
this.acknowledgementSetManager = acknowledgementSetManager;
this.pluginMetrics = pluginMetrics;
this.partitionCommitTrackerMap = new HashMap<>();
this.partitionsToReset = new HashSet<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use a concurrent set to avoid synchronization on this object.

this.partitionsToReset = Collections.synchronizedSet(new HashSet<>());

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better performance wise?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt it, but it would ensure that somebody doesn't forget to synchronize calls as the file is maintained.

} catch (Exception e) {
LOG.error("Failed to seek to last committed offset upon negative acknowledgement "+partition, e);
synchronized(partitionsToReset) {
partitionsToReset.add(partition);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a nice change. Did you do any performance testing or find any particular pitfalls with the original approach?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't done any performance testing. But definitely having a lock every time consumer is accessed is not good.

Thread.sleep(10000);
} catch (RecordDeserializationException e) {
LOG.warn("Serialization error - topic {} partition {} offset {}, seeking past the error record",
Copy link
Contributor

@hshardeesi hshardeesi Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serialization -> Deserialization.

Also increment a metric when we get to metrics.

} catch (RecordDeserializationException e) {
LOG.warn("Serialization error - topic {} partition {} offset {}, seeking past the error record",
e.topicPartition().topic(), e.topicPartition().partition(), e.offset());
consumer.seek(e.topicPartition(), e.offset()+1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it really required to explicitly seek past this record? can we not just log exception, count a metric and commit offset as usual?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the documentation, we have to seek past the offset to continue reading. We cannot commit the offset unless we get acknowledgement that previously read records are flushed to the sink, right?

@@ -214,8 +221,8 @@ public void run() {
try {
consumer.subscribe(Arrays.asList(topicName));
while (!shutdownInProgress.get()) {
resetOrCommitOffsets();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might be better to have separate functions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it. Resetting is just 3 or 4 line code and didn't feel like making it a separate function. Especially it may be null operation most of the time.

@@ -214,17 +212,15 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth
retryable = false;
try {
result = kafkaClient.getBootstrapBrokers(request);
} catch (InternalServerErrorException | ConflictException | ForbiddenException | UnauthorizedException | StsException e) {
} catch (KafkaException | StsException e) {
LOG.debug("Failed to get bootstrap server information from MSK. Retrying...", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be LOG.info? also make it explicit like will retry with exponential backoff or after so many seconds. Do we need to log entire backtrace? just e.message() may be enough?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sure every time we included just the message, entire stack trace was later needed. Hope fully, these are not common scenarios.

@kkondaka kkondaka merged commit d3a9099 into opensearch-project:main Aug 2, 2023
24 checks passed
@kkondaka kkondaka deleted the kafka-fixes branch May 13, 2024 05:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants