diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 3bf55f3da4044b..b64f40e4e71bc0 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1198,6 +1198,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_routine_load_task_num_per_be = 1024; + /** + * routine load timeout is equal to maxBatchIntervalS * routine_load_task_timeout_multiplier. + */ + @ConfField(mutable = true, masterOnly = true) + public static int routine_load_task_timeout_multiplier = 10; + /** * the max timeout of get kafka meta. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 3bd61463b0f60b..cb475bb1374413 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -226,7 +226,8 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserExcept ((KafkaProgress) progress).getOffsetByPartition(kafkaPartition)); } KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, - maxBatchIntervalS * 2 * 1000, 0, taskKafkaProgress, isMultiTable()); + maxBatchIntervalS * Config.routine_load_task_timeout_multiplier * 1000, + taskKafkaProgress, isMultiTable()); routineLoadTaskInfoList.add(kafkaTaskInfo); result.add(kafkaTaskInfo); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index 4659e03d3543ad..32971045e2762e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -50,16 +50,14 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { private Map partitionIdToOffset; public KafkaTaskInfo(UUID id, long jobId, - long timeoutMs, int timeoutBackOffCount, - Map partitionIdToOffset, boolean isMultiTable) { - super(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable); + long timeoutMs, Map partitionIdToOffset, boolean isMultiTable) { + super(id, jobId, timeoutMs, isMultiTable); this.partitionIdToOffset = partitionIdToOffset; } public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map partitionIdToOffset, boolean isMultiTable) { super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), - kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getTimeoutBackOffCount(), - kafkaTaskInfo.getBeId(), isMultiTable); + kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), isMultiTable); this.partitionIdToOffset = partitionIdToOffset; this.isEof = kafkaTaskInfo.getIsEof(); } @@ -140,11 +138,6 @@ private TExecPlanFragmentParams rePlan(RoutineLoadJob routineLoadJob) throws Use TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(planner, loadId, txnId); TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId); - // it needs update timeout to make task timeout backoff work - long timeoutS = this.getTimeoutMs() / 1000; - tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS); - tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS); - tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS); if (Config.enable_workload_group) { long wgId = routineLoadJob.getWorkloadId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 274fb0e3d46636..bb98e04325e65d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -739,18 +739,6 @@ public void processTimeoutTasks() { // and after renew, the previous task is removed from routineLoadTaskInfoList, // so task can no longer be committed successfully. // the already committed task will not be handled here. - int timeoutBackOffCount = routineLoadTaskInfo.getTimeoutBackOffCount(); - if (timeoutBackOffCount > RoutineLoadTaskInfo.MAX_TIMEOUT_BACK_OFF_COUNT) { - try { - updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TIMEOUT_TOO_MUCH, - "task " + routineLoadTaskInfo.getId() + " timeout too much"), false); - } catch (UserException e) { - LOG.warn("update job state to pause failed", e); - } - return; - } - routineLoadTaskInfo.setTimeoutBackOffCount(timeoutBackOffCount + 1); - routineLoadTaskInfo.setTimeoutMs((routineLoadTaskInfo.getTimeoutMs() << 1)); RoutineLoadTaskInfo newTask = unprotectRenewTask(routineLoadTaskInfo); Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newTask); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 301efe4d9c9604..1ff825d97b9d17 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -73,28 +73,23 @@ public abstract class RoutineLoadTaskInfo { protected boolean isMultiTable = false; - protected static final int MAX_TIMEOUT_BACK_OFF_COUNT = 3; - protected int timeoutBackOffCount = 0; - protected boolean isEof = false; // this status will be set when corresponding transaction's status is changed. // so that user or other logic can know the status of the corresponding txn. protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN; - public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, - int timeoutBackOffCount, boolean isMultiTable) { + public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, boolean isMultiTable) { this.id = id; this.jobId = jobId; this.createTimeMs = System.currentTimeMillis(); this.timeoutMs = timeoutMs; - this.timeoutBackOffCount = timeoutBackOffCount; this.isMultiTable = isMultiTable; } - public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, int timeoutBackOffCount, - long previousBeId, boolean isMultiTable) { - this(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable); + public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, long previousBeId, + boolean isMultiTable) { + this(id, jobId, timeoutMs, isMultiTable); this.previousBeId = previousBeId; } @@ -138,10 +133,6 @@ public void setLastScheduledTime(long lastScheduledTime) { this.lastScheduledTime = lastScheduledTime; } - public void setTimeoutMs(long timeoutMs) { - this.timeoutMs = timeoutMs; - } - public long getTimeoutMs() { return timeoutMs; } @@ -154,14 +145,6 @@ public TransactionStatus getTxnStatus() { return txnStatus; } - public void setTimeoutBackOffCount(int timeoutBackOffCount) { - this.timeoutBackOffCount = timeoutBackOffCount; - } - - public int getTimeoutBackOffCount() { - return timeoutBackOffCount; - } - public boolean getIsEof() { return isEof; } @@ -173,33 +156,17 @@ public boolean isTimeout() { } if (isRunning() && System.currentTimeMillis() - executeStartTimeMs > timeoutMs) { - LOG.info("task {} is timeout. start: {}, timeout: {}, timeoutBackOffCount: {}", DebugUtil.printId(id), - executeStartTimeMs, timeoutMs, timeoutBackOffCount); + LOG.info("task {} is timeout. start: {}, timeout: {}", DebugUtil.printId(id), + executeStartTimeMs, timeoutMs); return true; } return false; } public void handleTaskByTxnCommitAttachment(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { - selfAdaptTimeout(rlTaskTxnCommitAttachment); judgeEof(rlTaskTxnCommitAttachment); } - private void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { - long taskExecutionTime = rlTaskTxnCommitAttachment.getTaskExecutionTimeMs(); - long timeoutMs = this.timeoutMs; - - while (this.timeoutBackOffCount > 0) { - timeoutMs = timeoutMs >> 1; - if (timeoutMs <= taskExecutionTime) { - this.timeoutMs = timeoutMs << 1; - return; - } - this.timeoutBackOffCount--; - } - this.timeoutMs = timeoutMs; - } - private void judgeEof(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); if (rlTaskTxnCommitAttachment.getTotalRows() < routineLoadJob.getMaxBatchRows() diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index c18682bdc3c60c..20cb626ff37055 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -225,7 +225,7 @@ public void testProcessTimeOutTasks(@Injectable GlobalTransactionMgr globalTrans Map partitionIdsToOffset = Maps.newHashMap(); partitionIdsToOffset.put(100, 0L); KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, - maxBatchIntervalS * 2 * 1000, 0, partitionIdsToOffset, false); + maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false); kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - maxBatchIntervalS * 2 * 1000 - 1); routineLoadTaskInfoList.add(kafkaTaskInfo); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index c5bf509464e112..c1f5731329fbad 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -68,7 +68,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", partitionIdToOffset); LinkedBlockingDeque routineLoadTaskInfoQueue = new LinkedBlockingDeque<>(); - KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, 20000, 0, + KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, 20000, partitionIdToOffset, false); routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 5c8f72723adacd..8c62a18045d64e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -322,7 +322,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet List routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); Map partitionIdToOffset = Maps.newHashMap(); partitionIdToOffset.put(1, 0L); - KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0, + KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, partitionIdToOffset, false); Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); @@ -396,7 +396,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi List routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); Map partitionIdToOffset = Maps.newHashMap(); partitionIdToOffset.put(1, 0L); - KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, 0, + KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, 20000, partitionIdToOffset, false); Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo);