Skip to content

Commit

Permalink
reset Kafka progress cache when routine load job topic change
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui committed Aug 21, 2024
1 parent c71871d commit 0a89565
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,17 @@ private void getReadableProgress(Map<Integer, String> showPartitionIdToOffset) {
}
}

// modify the partition offset of this progress.
// throw exception is the specified partition does not exist in progress.
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
public void checkPartitions(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
if (!partitionIdToOffset.containsKey(pair.first)) {
throw new DdlException("The specified partition " + pair.first + " is not in the consumed partitions");
}
}
}

// modify the partition offset of this progress.
// throw exception is the specified partition does not exist in progress.
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
partitionIdToOffset.put(pair.first, pair.second);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,22 +673,32 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
customKafkaProperties = dataSourceProperties.getCustomKafkaProperties();
}

// modify partition offset first
if (!kafkaPartitionOffsets.isEmpty()) {
// we can only modify the partition that is being consumed
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
}

// convertCustomProperties and check partitions before reset progress to make modify operation atomic
if (!customKafkaProperties.isEmpty()) {
this.customProperties.putAll(customKafkaProperties);
convertCustomProperties(true);
}
// modify broker list and topic
if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
this.brokerList = dataSourceProperties.getBrokerList();

if (!kafkaPartitionOffsets.isEmpty()) {
((KafkaProgress) progress).checkPartitions(kafkaPartitionOffsets);
}

// It is necessary to reset the Kafka progress cache if topic change,
// and should reset cache before modifying partition offset.
if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
this.topic = dataSourceProperties.getTopic();
this.progress = new KafkaProgress();
}

// modify partition offset
if (!kafkaPartitionOffsets.isEmpty()) {
// we can only modify the partition that is being consumed
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
}

// modify broker list
if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
this.brokerList = dataSourceProperties.getBrokerList();
}
}
if (!jobProperties.isEmpty()) {
Expand Down

0 comments on commit 0a89565

Please sign in to comment.