Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Negative acknowledgement handling and other minor issues #3082

Merged
merged 3 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
*/
public class TopicConfig {
private static final String AUTO_COMMIT = "false";
private static final Duration AUTOCOMMIT_INTERVAL = Duration.ofSeconds(5);
private static final Integer SESSION_TIMEOUT = 45000;
private static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5);
private static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45);
private static final int MAX_RETRY_ATTEMPT = Integer.MAX_VALUE;
private static final String AUTO_OFFSET_RESET = "earliest";
static final String DEFAULT_AUTO_OFFSET_RESET = "latest";
static final Duration THREAD_WAITING_TIME = Duration.ofSeconds(5);
private static final Duration MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4);
private static final Duration BUFFER_DEFAULT_TIMEOUT = Duration.ofSeconds(5);
Expand All @@ -33,8 +33,8 @@ public class TopicConfig {
private static final Duration RETRY_BACKOFF = Duration.ofSeconds(100);
private static final Duration MAX_POLL_INTERVAL = Duration.ofSeconds(300000);
private static final Integer CONSUMER_MAX_POLL_RECORDS = 500;
private static final Integer NUM_OF_WORKERS = 5;
private static final Duration HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(3);
static final Integer DEFAULT_NUM_OF_WORKERS = 2;
static final Duration HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5);

@JsonProperty("name")
@NotNull
Expand All @@ -49,7 +49,7 @@ public class TopicConfig {
@JsonProperty("workers")
@Valid
@Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200")
private Integer workers = NUM_OF_WORKERS;
private Integer workers = DEFAULT_NUM_OF_WORKERS;

@JsonProperty("max_retry_attempts")
@Valid
Expand All @@ -67,18 +67,18 @@ public class TopicConfig {
@JsonProperty("auto_commit")
private Boolean autoCommit = false;

@JsonProperty("auto_commit_interval")
@JsonProperty("commit_interval")
@Valid
@Size(min = 1)
private Duration autoCommitInterval = AUTOCOMMIT_INTERVAL;
private Duration commitInterval = DEFAULT_COMMIT_INTERVAL;

@JsonProperty("session_timeout")
@Valid
@Size(min = 1)
private Integer sessionTimeOut = SESSION_TIMEOUT;
private Duration sessionTimeOut = DEFAULT_SESSION_TIMEOUT;

@JsonProperty("auto_offset_reset")
private String autoOffsetReset = AUTO_OFFSET_RESET;
private String autoOffsetReset = DEFAULT_AUTO_OFFSET_RESET;

@JsonProperty("group_name")
@Valid
Expand Down Expand Up @@ -148,15 +148,15 @@ public Boolean getAutoCommit() {
return autoCommit;
}

public Duration getAutoCommitInterval() {
return autoCommitInterval;
public Duration getCommitInterval() {
return commitInterval;
}

public void setAutoCommitInterval(Duration autoCommitInterval) {
this.autoCommitInterval = autoCommitInterval;
public void setCommitInterval(Duration commitInterval) {
this.commitInterval = commitInterval;
}

public Integer getSessionTimeOut() {
public Duration getSessionTimeOut() {
return sessionTimeOut;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAn
if (Objects.isNull(offsetAndMetadata)) {
return;
}
synchronized (this) {
synchronized (offsetsToCommit) {
offsetsToCommit.put(partition, offsetAndMetadata);
}
}
Expand All @@ -118,18 +118,34 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, Range<Lo
if (result == true) {
positiveAcknowledgementSetCounter.increment();
offsets.forEach((partition, offsetRange) -> {
int partitionId = partition.partition();
if (!partitionCommitTrackerMap.containsKey(partitionId)) {
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition);
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null;

partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset));
try {
int partitionId = partition.partition();
if (!partitionCommitTrackerMap.containsKey(partitionId)) {
OffsetAndMetadata committedOffsetAndMetadata = null;
synchronized(consumer) {
committedOffsetAndMetadata = consumer.committed(partition);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Kafka consumer not thread safe? Does this not just ask the server to commit?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka consumer is not thread-safe. Any usage to "consumer" must be protected across different threads. I was getting concurrent usage error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/

if we can avoid synchronization to commit offsets. I didnt see this blog using synchronization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That post explicitly states:

When implementing a multi-threaded consumer architecture, it is important to note that the Kafka consumer is not thread safe. Multi-threaded access must be properly synchronized, which can be tricky. This is why the single-threaded model is commonly used.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are single threaded except for the acknowledgements. So, I think the chance of lock contention is very low.

}
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null;
partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset));
}
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange);
updateOffsetsToCommit(partition, offsetAndMetadata);
} catch (Exception e) {
LOG.error("Failed to seek to last committed offset upon positive acknowledgement "+partition, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid string concatenation for log lines.

Make this:

LOG.error("Failed to seek to last committed offset upon positive acknowledgement {}", partition, e);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this in the next PR.

}
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange);
updateOffsetsToCommit(partition, offsetAndMetadata);
});
} else {
positiveAcknowledgementSetCounter.increment();
negativeAcknowledgementSetCounter.increment();
offsets.forEach((partition, offsetRange) -> {
try {
synchronized(consumer) {
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can this be final?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this in the next PR.

consumer.seek(partition, committedOffsetAndMetadata);
}
} catch (Exception e) {
LOG.error("Failed to seek to last committed offset upon negative acknowledgement "+partition, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Log4j's string interpolation here. See comment above.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this in the next PR.

}
});
}
}, acknowledgementsTimeout);
return acknowledgementSet;
Expand All @@ -141,9 +157,11 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, Range<Lo

public <T> void consumeRecords() throws Exception {
try {
ConsumerRecords<String, T> records =
consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2);
if (!records.isEmpty() && records.count() > 0) {
ConsumerRecords<String, T> records = null;
synchronized(consumer) {
records = consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2);
}
if (Objects.nonNull(records) && !records.isEmpty() && records.count() > 0) {
Map<TopicPartition, Range<Long>> offsets = new HashMap<>();
AcknowledgementSet acknowledgementSet = null;
if (acknowledgementsEnabled) {
Expand All @@ -168,15 +186,17 @@ private void commitOffsets() {
return;
}
long currentTimeMillis = System.currentTimeMillis();
if ((currentTimeMillis - lastCommitTime) < COMMIT_OFFSET_INTERVAL_MS) {
if ((currentTimeMillis - lastCommitTime) < topicConfig.getCommitInterval().toMillis()) {
return;
}
synchronized (this) {
synchronized (offsetsToCommit) {
if (offsetsToCommit.isEmpty()) {
return;
}
try {
consumer.commitSync();
synchronized(consumer) {
consumer.commitSync();
}
offsetsToCommit.clear();
lastCommitTime = currentTimeMillis;
} catch (CommitFailedException e) {
Expand Down Expand Up @@ -286,8 +306,10 @@ public void shutdownConsumer(){
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition topicPartition : partitions) {
Long committedOffset = consumer.committed(topicPartition).offset();
consumer.seek(topicPartition, committedOffset);
synchronized(consumer) {
Long committedOffset = consumer.committed(topicPartition).offset();
consumer.seek(topicPartition, committedOffset);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig,
@Override
public void start(Buffer<Record<Event>> buffer) {
sourceConfig.getTopics().forEach(topic -> {
consumerGroupID = getGroupId(topic.getName());
consumerGroupID = topic.getGroupId();
Copy link
Contributor

@hshardeesi hshardeesi Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we adda test case for this? so that we catch any regression

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this in the next PR.

Properties consumerProperties = getConsumerProperties(topic);
MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType);
try {
Expand Down Expand Up @@ -175,10 +175,6 @@ public void stop() {
LOG.info("Consumer shutdown successfully...");
}

private String getGroupId(String name) {
return pipelineName + "::" + name;
}

private long calculateLongestThreadWaitingTime() {
List<TopicConfig> topicsList = sourceConfig.getTopics();
return topicsList.stream().
Expand Down Expand Up @@ -368,13 +364,13 @@ private void setConsumerTopicProperties(Properties properties, TopicConfig topic
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
topicConfig.getAutoCommit());
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
topicConfig.getAutoCommitInterval().toSecondsPart());
((Long)topicConfig.getCommitInterval().toMillis()).intValue());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
topicConfig.getAutoOffsetReset());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
topicConfig.getConsumerMaxPollRecords());
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, topicConfig.getSessionTimeOut());
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, topicConfig.getHeartBeatInterval().toSecondsPart());
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long)topicConfig.getSessionTimeOut().toMillis()).intValue());
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long)topicConfig.getHeartBeatInterval().toMillis()).intValue());
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, topicConfig.getFetchMaxBytes().intValue());
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ void test_topicsConfig_not_null() {
void testConfigValues_default() {
assertEquals("my-topic-2", topicConfig.getName());
assertEquals(false, topicConfig.getAutoCommit());
assertEquals(Duration.ofSeconds(5), topicConfig.getAutoCommitInterval());
assertEquals(45000, topicConfig.getSessionTimeOut());
assertEquals("earliest", topicConfig.getAutoOffsetReset());
assertEquals(Duration.ofSeconds(5), topicConfig.getCommitInterval());
assertEquals(45000, topicConfig.getSessionTimeOut().toMillis());
assertEquals(TopicConfig.DEFAULT_AUTO_OFFSET_RESET, topicConfig.getAutoOffsetReset());
assertEquals(TopicConfig.THREAD_WAITING_TIME, topicConfig.getThreadWaitingTime());
assertEquals(Duration.ofSeconds(4), topicConfig.getMaxRecordFetchTime());
assertEquals(Duration.ofSeconds(5), topicConfig.getBufferDefaultTimeout());
Expand All @@ -80,17 +80,17 @@ void testConfigValues_default() {
assertEquals(Duration.ofSeconds(100), topicConfig.getRetryBackoff());
assertEquals(Duration.ofSeconds(300000), topicConfig.getMaxPollInterval());
assertEquals(500L, topicConfig.getConsumerMaxPollRecords().longValue());
assertEquals(5, topicConfig.getWorkers().intValue());
assertEquals(Duration.ofSeconds(3), topicConfig.getHeartBeatInterval());
assertEquals(TopicConfig.DEFAULT_NUM_OF_WORKERS, topicConfig.getWorkers().intValue());
assertEquals(TopicConfig.HEART_BEAT_INTERVAL_DURATION, topicConfig.getHeartBeatInterval());
}

@Test
@Tag(YAML_FILE_WITH_CONSUMER_CONFIG)
void testConfigValues_from_yaml() {
assertEquals("my-topic-1", topicConfig.getName());
assertEquals(false, topicConfig.getAutoCommit());
assertEquals(Duration.ofSeconds(5), topicConfig.getAutoCommitInterval());
assertEquals(45000, topicConfig.getSessionTimeOut());
assertEquals(Duration.ofSeconds(5), topicConfig.getCommitInterval());
assertEquals(45000, topicConfig.getSessionTimeOut().toMillis());
assertEquals("earliest", topicConfig.getAutoOffsetReset());
assertEquals(Duration.ofSeconds(1), topicConfig.getThreadWaitingTime());
assertEquals(Duration.ofSeconds(4), topicConfig.getMaxRecordFetchTime());
Expand All @@ -110,7 +110,7 @@ void testConfigValues_from_yaml() {
void testConfigValues_from_yaml_not_null() {
assertNotNull(topicConfig.getName());
assertNotNull(topicConfig.getAutoCommit());
assertNotNull(topicConfig.getAutoCommitInterval());
assertNotNull(topicConfig.getCommitInterval());
assertNotNull(topicConfig.getSessionTimeOut());
assertNotNull(topicConfig.getAutoOffsetReset());
assertNotNull(topicConfig.getThreadWaitingTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,44 @@ public void testPlainTextConsumeRecordsWithAcknowledgements() throws Interrupted
});
}

@Test
public void testPlainTextConsumeRecordsWithNegativeAcknowledgements() throws InterruptedException {
String topic = topicConfig.getName();
consumerRecords = createPlainTextRecords(topic);
when(kafkaConsumer.poll(anyLong())).thenReturn(consumerRecords);
consumer = createObjectUnderTest("plaintext", true);

try {
consumer.consumeRecords();
} catch (Exception e){}
final Map.Entry<Collection<Record<Event>>, CheckpointState> bufferRecords = buffer.read(1000);
ArrayList<Record<Event>> bufferedRecords = new ArrayList<>(bufferRecords.getKey());
Assertions.assertEquals(consumerRecords.count(), bufferedRecords.size());
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = consumer.getOffsetsToCommit();
Assertions.assertEquals(offsetsToCommit.size(), 0);

for (Record<Event> record: bufferedRecords) {
Event event = record.getData();
String value1 = event.get(testKey1, String.class);
String value2 = event.get(testKey2, String.class);
assertTrue(value1 != null || value2 != null);
if (value1 != null) {
Assertions.assertEquals(value1, testValue1);
}
if (value2 != null) {
Assertions.assertEquals(value2, testValue2);
}
event.getEventHandle().release(false);
}
// Wait for acknowledgement callback function to run
try {
Thread.sleep(10000);
} catch (Exception e){}

offsetsToCommit = consumer.getOffsetsToCommit();
Assertions.assertEquals(offsetsToCommit.size(), 0);
}

@Test
public void testJsonConsumeRecords() throws InterruptedException, Exception {
String topic = topicConfig.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ void setUp() throws Exception {
when(topic2.getName()).thenReturn("topic2");
when(topic1.getWorkers()).thenReturn(2);
when(topic2.getWorkers()).thenReturn(3);
when(topic1.getAutoCommitInterval()).thenReturn(Duration.ofSeconds(1));
when(topic2.getAutoCommitInterval()).thenReturn(Duration.ofSeconds(1));
when(topic1.getCommitInterval()).thenReturn(Duration.ofSeconds(1));
when(topic2.getCommitInterval()).thenReturn(Duration.ofSeconds(1));
when(topic1.getAutoOffsetReset()).thenReturn("earliest");
when(topic2.getAutoOffsetReset()).thenReturn("earliest");
when(topic1.getConsumerMaxPollRecords()).thenReturn(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ log-pipeline:
- name: my-topic-1
workers: 5
auto_commit: false
auto_commit_interval: PT5S
session_timeout: 45000
commit_interval: PT5S
session_timeout: PT45S
max_retry_attempts: 1000
auto_offset_reset: earliest
thread_waiting_time: PT1S
Expand Down
Loading