Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Negative acknowledgement handling and other minor issues #3082

Merged
merged 3 commits into from
Aug 1, 2023

Conversation

kkondaka
Copy link
Collaborator

Description

This PR contains fixes for following

  1. Handle the negative acknowledgement correctly by seeking to the right position so that the next poll() would read the records again
  2. Change auto_commit_interval to commit_interval so that it can be used in manual commits too
  3. Changed all the timeout/interval config options to Duration.

Issues Resolved

[List any issues this PR will resolve]

Check List

  • [ X] New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • [X ] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Krishna Kondaka added 2 commits July 31, 2023 19:57
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange);
updateOffsetsToCommit(partition, offsetAndMetadata);
} catch (Exception e) {
LOG.error("Failed to seek to last committed offset upon positive acknowledgement "+partition, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid string concatenation for log lines.

Make this:

LOG.error("Failed to seek to last committed offset upon positive acknowledgement {}", partition, e);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this in the next PR.

consumer.seek(partition, committedOffsetAndMetadata);
}
} catch (Exception e) {
LOG.error("Failed to seek to last committed offset upon negative acknowledgement "+partition, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Log4j's string interpolation here. See comment above.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this in the next PR.

if (!partitionCommitTrackerMap.containsKey(partitionId)) {
OffsetAndMetadata committedOffsetAndMetadata = null;
synchronized(consumer) {
committedOffsetAndMetadata = consumer.committed(partition);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Kafka consumer not thread safe? Does this not just ask the server to commit?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka consumer is not thread-safe. Any usage to "consumer" must be protected across different threads. I was getting concurrent usage error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/

if we can avoid synchronization to commit offsets. I didnt see this blog using synchronization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That post explicitly states:

When implementing a multi-threaded consumer architecture, it is important to note that the Kafka consumer is not thread safe. Multi-threaded access must be properly synchronized, which can be tricky. This is why the single-threaded model is commonly used.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are single threaded except for the acknowledgements. So, I think the chance of lock contention is very low.

offsets.forEach((partition, offsetRange) -> {
try {
synchronized(consumer) {
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can this be final?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this in the next PR.

@@ -19,13 +19,6 @@ public class PlainTextAuthConfig {
@JsonProperty("password")
private String password;

@JsonProperty("security_protocol")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security_protocol doesn't need to be configurable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It was derived from the other config options.

@@ -118,7 +118,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig,
@Override
public void start(Buffer<Record<Event>> buffer) {
sourceConfig.getTopics().forEach(topic -> {
consumerGroupID = getGroupId(topic.getName());
consumerGroupID = topic.getGroupId();
Copy link
Contributor

@hshardeesi hshardeesi Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we adda test case for this? so that we catch any regression

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this in the next PR.

@kkondaka kkondaka merged commit 3ab7831 into opensearch-project:main Aug 1, 2023
23 of 24 checks passed
@kkondaka kkondaka deleted the kafka-nack-fix branch May 13, 2024 05:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants