From 9b07bcee730be77dead49ca80d495df922d5d267 Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Thu, 21 Mar 2024 19:48:05 +0800 Subject: [PATCH] fix timeout backoff can not work --- .../load/routineload/KafkaRoutineLoadJob.java | 2 +- .../doris/load/routineload/KafkaTaskInfo.java | 16 +++++++++++++--- .../load/routineload/RoutineLoadTaskInfo.java | 11 +++++++---- .../routineload/KafkaRoutineLoadJobTest.java | 2 +- .../RoutineLoadTaskSchedulerTest.java | 2 +- .../transaction/GlobalTransactionMgrTest.java | 4 ++-- 6 files changed, 25 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 6e6dba068e995a4..bc1f1428a919b5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -227,7 +227,7 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserExcept ((KafkaProgress) progress).getOffsetByPartition(kafkaPartition)); } KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, clusterName, - maxBatchIntervalS * 2 * 1000, taskKafkaProgress, isMultiTable()); + maxBatchIntervalS * 2 * 1000, 0, taskKafkaProgress, isMultiTable()); routineLoadTaskInfoList.add(kafkaTaskInfo); result.add(kafkaTaskInfo); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index d6f0a28705732b6..4b02d15dec505ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -47,14 +47,16 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { private Map partitionIdToOffset; public KafkaTaskInfo(UUID id, long jobId, String clusterName, - long timeoutMs, Map partitionIdToOffset, boolean isMultiTable) { - super(id, jobId, clusterName, timeoutMs, isMultiTable); + long timeoutMs, int timeoutBackOffCount, + Map partitionIdToOffset, boolean isMultiTable) { + super(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable); this.partitionIdToOffset = partitionIdToOffset; } public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map partitionIdToOffset, boolean isMultiTable) { super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), kafkaTaskInfo.getClusterName(), - kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), isMultiTable); + kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getTimeoutBackOffCount(), + kafkaTaskInfo.getBeId(), isMultiTable); this.partitionIdToOffset = partitionIdToOffset; } @@ -129,6 +131,10 @@ private TExecPlanFragmentParams rePlan(RoutineLoadJob routineLoadJob) throws Use TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(loadId, txnId); TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId); + long timeoutS = this.getTimeoutMs() / 1000; + tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS); + tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS); + tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS); return tExecPlanFragmentParams; } @@ -138,6 +144,10 @@ private TPipelineFragmentParams rePlanForPipeline(RoutineLoadJob routineLoadJob) TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.planForPipeline(loadId, txnId); TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId); + long timeoutS = this.getTimeoutMs() / 1000; + tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS); + tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS); + tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS); return tExecPlanFragmentParams; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 500f1e27258713f..fb7b02a2f9cf2b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -80,18 +80,21 @@ public abstract class RoutineLoadTaskInfo { // so that user or other logic can know the status of the corresponding txn. protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN; - public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs, boolean isMultiTable) { + + public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs, + int timeoutBackOffCount, boolean isMultiTable) { this.id = id; this.jobId = jobId; this.clusterName = clusterName; this.createTimeMs = System.currentTimeMillis(); this.timeoutMs = timeoutMs; + this.timeoutBackOffCount = timeoutBackOffCount; this.isMultiTable = isMultiTable; } - public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs, long previousBeId, - boolean isMultiTable) { - this(id, jobId, clusterName, timeoutMs, isMultiTable); + public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs, + int timeoutBackOffCount, long previousBeId, boolean isMultiTable) { + this(id, jobId, timeoutMs, clusterName, timeoutBackOffCount, isMultiTable); this.previousBeId = previousBeId; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 57ded401bd9bfd6..73213fc7ffca8b4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -224,7 +224,7 @@ public void testProcessTimeOutTasks(@Injectable GlobalTransactionMgr globalTrans Map partitionIdsToOffset = Maps.newHashMap(); partitionIdsToOffset.put(100, 0L); KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster", - maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false); + maxBatchIntervalS * 2 * 1000, 0, partitionIdsToOffset, false); kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - maxBatchIntervalS * 2 * 1000 - 1); routineLoadTaskInfoList.add(kafkaTaskInfo); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index e0fdd92f73757b0..02db47538fb6997 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -69,7 +69,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", partitionIdToOffset); Queue routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue(); - KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster", 20000, + KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster", 20000, 0, partitionIdToOffset, false); routineLoadTaskInfoQueue.add(routineLoadTaskInfo1); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index a819c4f030178f1..414f5cf03c4db55 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -318,7 +318,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet List routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); Map partitionIdToOffset = Maps.newHashMap(); partitionIdToOffset.put(1, 0L); - KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "default_cluster", 20000, + KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "default_cluster", 20000, 0, partitionIdToOffset, false); Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); @@ -390,7 +390,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi List routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); Map partitionIdToOffset = Maps.newHashMap(); partitionIdToOffset.put(1, 0L); - KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", 20000, + KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", 20000, 0, partitionIdToOffset, false); Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo);