Skip to content

Commit

Permalink
update partition offset cache timely to avoid negative lag
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui committed Jan 27, 2024
1 parent 06cbb96 commit 5587604
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,8 @@ public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
// check if given partitions has more data to consume.
// 'partitionIdToOffset' to the offset to be consumed.
public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long> partitionIdToOffset) throws UserException {
boolean needUpdateCache = false;
// it is need check all partitions, for some partitions offset may be out of time
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey())
&& entry.getValue() < cachedPartitionWithLatestOffsets.get(entry.getKey())) {
Expand All @@ -715,9 +717,13 @@ public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long> partitionIdT
// query_watermark_offsets() will return 4.)
LOG.debug("has more data to consume. offsets to be consumed: {}, latest offsets: {}, task {}, job {}",
partitionIdToOffset, cachedPartitionWithLatestOffsets, taskId, id);
return true;
} else {
needUpdateCache = true;
}
}
if (needUpdateCache == false) {
return true;
}

try {
// all offsets to be consumed are newer than offsets in cachedPartitionWithLatestOffsets,
Expand Down

0 comments on commit 5587604

Please sign in to comment.