From 55876042ac2b628373177dae8beec91891a21c27 Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Sat, 27 Jan 2024 11:13:54 +0800 Subject: [PATCH] update partition offset cache timely to avoid negative lag --- .../doris/load/routineload/KafkaRoutineLoadJob.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index faad0a0248a10b6..6475c4a3987eeb9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -705,6 +705,8 @@ public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) { // check if given partitions has more data to consume. // 'partitionIdToOffset' to the offset to be consumed. public boolean hasMoreDataToConsume(UUID taskId, Map partitionIdToOffset) throws UserException { + boolean needUpdateCache = false; + // it is need check all partitions, for some partitions offset may be out of time for (Map.Entry entry : partitionIdToOffset.entrySet()) { if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey()) && entry.getValue() < cachedPartitionWithLatestOffsets.get(entry.getKey())) { @@ -715,9 +717,13 @@ public boolean hasMoreDataToConsume(UUID taskId, Map partitionIdT // query_watermark_offsets() will return 4.) LOG.debug("has more data to consume. offsets to be consumed: {}, latest offsets: {}, task {}, job {}", partitionIdToOffset, cachedPartitionWithLatestOffsets, taskId, id); - return true; + } else { + needUpdateCache = true; } } + if (needUpdateCache == false) { + return true; + } try { // all offsets to be consumed are newer than offsets in cachedPartitionWithLatestOffsets,