Skip to content

Commit

Permalink
[fix](routine load) reset Kafka progress cache when routine load job …
Browse files Browse the repository at this point in the history
…topic change (#38474) (#39528)

pick (#38474)

When change routine load job topic from test_topic_before to
test_topic_after by
```
ALTER ROUTINE LOAD FOR test_topic_change FROM KAFKA("kafka_topic" = "test_topic_after");
```
(test_topic_before has 5 rows and test_topic_after has 1 rows)

Exception happened, which cannot consume any data:
```
2024-07-29 15:57:28,122 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,123 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,125 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,126 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,128 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,129 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,131 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,133 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,134 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,136 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,137 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
```

It is necessary to reset Kafka progress cache when routine load job
topic change.
  • Loading branch information
sollhui authored Aug 26, 2024
1 parent e7435bd commit a6f4710
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,17 @@ private void getReadableProgress(Map<Integer, String> showPartitionIdToOffset) {
}
}

// modify the partition offset of this progress.
// throw exception is the specified partition does not exist in progress.
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
public void checkPartitions(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
if (!partitionIdToOffset.containsKey(pair.first)) {
throw new DdlException("The specified partition " + pair.first + " is not in the consumed partitions");
}
}
}

// modify the partition offset of this progress.
// throw exception is the specified partition does not exist in progress.
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
partitionIdToOffset.put(pair.first, pair.second);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,22 +673,32 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
customKafkaProperties = dataSourceProperties.getCustomKafkaProperties();
}

// modify partition offset first
if (!kafkaPartitionOffsets.isEmpty()) {
// we can only modify the partition that is being consumed
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
}

// convertCustomProperties and check partitions before reset progress to make modify operation atomic
if (!customKafkaProperties.isEmpty()) {
this.customProperties.putAll(customKafkaProperties);
convertCustomProperties(true);
}
// modify broker list and topic
if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
this.brokerList = dataSourceProperties.getBrokerList();

if (!kafkaPartitionOffsets.isEmpty()) {
((KafkaProgress) progress).checkPartitions(kafkaPartitionOffsets);
}

// It is necessary to reset the Kafka progress cache if topic change,
// and should reset cache before modifying partition offset.
if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
this.topic = dataSourceProperties.getTopic();
this.progress = new KafkaProgress();
}

// modify partition offset
if (!kafkaPartitionOffsets.isEmpty()) {
// we can only modify the partition that is being consumed
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
}

// modify broker list
if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
this.brokerList = dataSourceProperties.getBrokerList();
}
}
if (!jobProperties.isEmpty()) {
Expand Down

0 comments on commit a6f4710

Please sign in to comment.