-
Notifications
You must be signed in to change notification settings - Fork 190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Negative acknowledgement handling and other minor issues #3082
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,7 +107,7 @@ public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAn | |
if (Objects.isNull(offsetAndMetadata)) { | ||
return; | ||
} | ||
synchronized (this) { | ||
synchronized (offsetsToCommit) { | ||
offsetsToCommit.put(partition, offsetAndMetadata); | ||
} | ||
} | ||
|
@@ -118,18 +118,34 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, Range<Lo | |
if (result == true) { | ||
positiveAcknowledgementSetCounter.increment(); | ||
offsets.forEach((partition, offsetRange) -> { | ||
int partitionId = partition.partition(); | ||
if (!partitionCommitTrackerMap.containsKey(partitionId)) { | ||
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition); | ||
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null; | ||
|
||
partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset)); | ||
try { | ||
int partitionId = partition.partition(); | ||
if (!partitionCommitTrackerMap.containsKey(partitionId)) { | ||
OffsetAndMetadata committedOffsetAndMetadata = null; | ||
synchronized(consumer) { | ||
committedOffsetAndMetadata = consumer.committed(partition); | ||
} | ||
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null; | ||
partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset)); | ||
} | ||
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange); | ||
updateOffsetsToCommit(partition, offsetAndMetadata); | ||
} catch (Exception e) { | ||
LOG.error("Failed to seek to last committed offset upon positive acknowledgement "+partition, e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid string concatenation for log lines. Make this:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do this in the next PR. |
||
} | ||
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange); | ||
updateOffsetsToCommit(partition, offsetAndMetadata); | ||
}); | ||
} else { | ||
positiveAcknowledgementSetCounter.increment(); | ||
negativeAcknowledgementSetCounter.increment(); | ||
offsets.forEach((partition, offsetRange) -> { | ||
try { | ||
synchronized(consumer) { | ||
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Can this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do this in the next PR. |
||
consumer.seek(partition, committedOffsetAndMetadata); | ||
} | ||
} catch (Exception e) { | ||
LOG.error("Failed to seek to last committed offset upon negative acknowledgement "+partition, e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use Log4j's string interpolation here. See comment above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do this in the next PR. |
||
} | ||
}); | ||
} | ||
}, acknowledgementsTimeout); | ||
return acknowledgementSet; | ||
|
@@ -141,9 +157,11 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, Range<Lo | |
|
||
public <T> void consumeRecords() throws Exception { | ||
try { | ||
ConsumerRecords<String, T> records = | ||
consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2); | ||
if (!records.isEmpty() && records.count() > 0) { | ||
ConsumerRecords<String, T> records = null; | ||
synchronized(consumer) { | ||
records = consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2); | ||
} | ||
if (Objects.nonNull(records) && !records.isEmpty() && records.count() > 0) { | ||
Map<TopicPartition, Range<Long>> offsets = new HashMap<>(); | ||
AcknowledgementSet acknowledgementSet = null; | ||
if (acknowledgementsEnabled) { | ||
|
@@ -168,15 +186,17 @@ private void commitOffsets() { | |
return; | ||
} | ||
long currentTimeMillis = System.currentTimeMillis(); | ||
if ((currentTimeMillis - lastCommitTime) < COMMIT_OFFSET_INTERVAL_MS) { | ||
if ((currentTimeMillis - lastCommitTime) < topicConfig.getCommitInterval().toMillis()) { | ||
return; | ||
} | ||
synchronized (this) { | ||
synchronized (offsetsToCommit) { | ||
if (offsetsToCommit.isEmpty()) { | ||
return; | ||
} | ||
try { | ||
consumer.commitSync(); | ||
synchronized(consumer) { | ||
consumer.commitSync(); | ||
} | ||
offsetsToCommit.clear(); | ||
lastCommitTime = currentTimeMillis; | ||
} catch (CommitFailedException e) { | ||
|
@@ -286,8 +306,10 @@ public void shutdownConsumer(){ | |
@Override | ||
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { | ||
for (TopicPartition topicPartition : partitions) { | ||
Long committedOffset = consumer.committed(topicPartition).offset(); | ||
consumer.seek(topicPartition, committedOffset); | ||
synchronized(consumer) { | ||
Long committedOffset = consumer.committed(topicPartition).offset(); | ||
consumer.seek(topicPartition, committedOffset); | ||
} | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -118,7 +118,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig, | |
@Override | ||
public void start(Buffer<Record<Event>> buffer) { | ||
sourceConfig.getTopics().forEach(topic -> { | ||
consumerGroupID = getGroupId(topic.getName()); | ||
consumerGroupID = topic.getGroupId(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we adda test case for this? so that we catch any regression There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do this in the next PR. |
||
Properties consumerProperties = getConsumerProperties(topic); | ||
MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); | ||
try { | ||
|
@@ -175,10 +175,6 @@ public void stop() { | |
LOG.info("Consumer shutdown successfully..."); | ||
} | ||
|
||
private String getGroupId(String name) { | ||
return pipelineName + "::" + name; | ||
} | ||
|
||
private long calculateLongestThreadWaitingTime() { | ||
List<TopicConfig> topicsList = sourceConfig.getTopics(); | ||
return topicsList.stream(). | ||
|
@@ -368,13 +364,13 @@ private void setConsumerTopicProperties(Properties properties, TopicConfig topic | |
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, | ||
topicConfig.getAutoCommit()); | ||
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, | ||
topicConfig.getAutoCommitInterval().toSecondsPart()); | ||
((Long)topicConfig.getCommitInterval().toMillis()).intValue()); | ||
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, | ||
topicConfig.getAutoOffsetReset()); | ||
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, | ||
topicConfig.getConsumerMaxPollRecords()); | ||
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, topicConfig.getSessionTimeOut()); | ||
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, topicConfig.getHeartBeatInterval().toSecondsPart()); | ||
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long)topicConfig.getSessionTimeOut().toMillis()).intValue()); | ||
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long)topicConfig.getHeartBeatInterval().toMillis()).intValue()); | ||
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, topicConfig.getFetchMaxBytes().intValue()); | ||
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait()); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is Kafka consumer not thread safe? Does this not just ask the server to commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kafka consumer is not thread-safe. Any usage to "consumer" must be protected across different threads. I was getting concurrent usage error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/
if we can avoid synchronization to commit offsets. I didnt see this blog using synchronization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That post explicitly states:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we are single threaded except for the acknowledgements. So, I think the chance of lock contention is very low.