-
Notifications
You must be signed in to change notification settings - Fork 190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Negative acknowledgement handling and other minor issues #3082
Conversation
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange); | ||
updateOffsetsToCommit(partition, offsetAndMetadata); | ||
} catch (Exception e) { | ||
LOG.error("Failed to seek to last committed offset upon positive acknowledgement "+partition, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid string concatenation for log lines.
Make this:
LOG.error("Failed to seek to last committed offset upon positive acknowledgement {}", partition, e);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do this in the next PR.
consumer.seek(partition, committedOffsetAndMetadata); | ||
} | ||
} catch (Exception e) { | ||
LOG.error("Failed to seek to last committed offset upon negative acknowledgement "+partition, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use Log4j's string interpolation here. See comment above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do this in the next PR.
if (!partitionCommitTrackerMap.containsKey(partitionId)) { | ||
OffsetAndMetadata committedOffsetAndMetadata = null; | ||
synchronized(consumer) { | ||
committedOffsetAndMetadata = consumer.committed(partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is Kafka consumer not thread safe? Does this not just ask the server to commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kafka consumer is not thread-safe. Any usage to "consumer" must be protected across different threads. I was getting concurrent usage error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/
if we can avoid synchronization to commit offsets. I didnt see this blog using synchronization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That post explicitly states:
When implementing a multi-threaded consumer architecture, it is important to note that the Kafka consumer is not thread safe. Multi-threaded access must be properly synchronized, which can be tricky. This is why the single-threaded model is commonly used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we are single threaded except for the acknowledgements. So, I think the chance of lock contention is very low.
offsets.forEach((partition, offsetRange) -> { | ||
try { | ||
synchronized(consumer) { | ||
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can this be final
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do this in the next PR.
@@ -19,13 +19,6 @@ public class PlainTextAuthConfig { | |||
@JsonProperty("password") | |||
private String password; | |||
|
|||
@JsonProperty("security_protocol") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
security_protocol doesn't need to be configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. It was derived from the other config options.
@@ -118,7 +118,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig, | |||
@Override | |||
public void start(Buffer<Record<Event>> buffer) { | |||
sourceConfig.getTopics().forEach(topic -> { | |||
consumerGroupID = getGroupId(topic.getName()); | |||
consumerGroupID = topic.getGroupId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we adda test case for this? so that we catch any regression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do this in the next PR.
Description
This PR contains fixes for following
auto_commit_interval
tocommit_interval
so that it can be used in manual commits tooIssues Resolved
[List any issues this PR will resolve]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.