Skip to content

Commit

Permalink
[branch-2.0](routine-load) self-adaption backoff timeout (apache#32591)
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored and weixingyu12 committed Mar 22, 2024
1 parent eb74d15 commit 01ff835
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public enum InternalErrorCode {
TOO_MANY_FAILURE_ROWS_ERR(102),
CREATE_TASKS_ERR(103),
TASKS_ABORT_ERR(104),
CANNOT_RESUME_ERR(105);
CANNOT_RESUME_ERR(105),
TIMEOUT_TOO_MUCH(106);

private long errCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserExcept
((KafkaProgress) progress).getOffsetByPartition(kafkaPartition));
}
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, clusterName,
maxBatchIntervalS * 2 * 1000, taskKafkaProgress, isMultiTable());
maxBatchIntervalS * 2 * 1000, 0, taskKafkaProgress, isMultiTable());
routineLoadTaskInfoList.add(kafkaTaskInfo);
result.add(kafkaTaskInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
private Map<Integer, Long> partitionIdToOffset;

public KafkaTaskInfo(UUID id, long jobId, String clusterName,
long timeoutMs, Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) {
super(id, jobId, clusterName, timeoutMs, isMultiTable);
long timeoutMs, int timeoutBackOffCount,
Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) {
super(id, jobId, clusterName, timeoutMs, timeoutBackOffCount, isMultiTable);
this.partitionIdToOffset = partitionIdToOffset;
}

public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) {
super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), kafkaTaskInfo.getClusterName(),
kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), isMultiTable);
kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getTimeoutBackOffCount(),
kafkaTaskInfo.getBeId(), isMultiTable);
this.partitionIdToOffset = partitionIdToOffset;
}

Expand Down Expand Up @@ -129,6 +131,11 @@ private TExecPlanFragmentParams rePlan(RoutineLoadJob routineLoadJob) throws Use
TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
// it need update timeout to make task timeout backoff work
long timeoutS = this.getTimeoutMs() / 1000;
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS);
tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS);
return tExecPlanFragmentParams;
}

Expand All @@ -138,6 +145,11 @@ private TPipelineFragmentParams rePlanForPipeline(RoutineLoadJob routineLoadJob)
TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.planForPipeline(loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
// it need update timeout to make task timeout backoff work
long timeoutS = this.getTimeoutMs() / 1000;
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS);
tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS);
return tExecPlanFragmentParams;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,18 @@ public void processTimeoutTasks() {
// and after renew, the previous task is removed from routineLoadTaskInfoList,
// so task can no longer be committed successfully.
// the already committed task will not be handled here.
int timeoutBackOffCount = routineLoadTaskInfo.getTimeoutBackOffCount();
if (timeoutBackOffCount > RoutineLoadTaskInfo.MAX_TIMEOUT_BACK_OFF_COUNT) {
try {
updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TIMEOUT_TOO_MUCH,
"task " + routineLoadTaskInfo.getId() + " timeout too much"), false);
} catch (UserException e) {
LOG.warn("update job state to pause failed", e);
}
return;
}
routineLoadTaskInfo.setTimeoutBackOffCount(timeoutBackOffCount + 1);
routineLoadTaskInfo.setTimeoutMs((routineLoadTaskInfo.getTimeoutMs() << 1));
RoutineLoadTaskInfo newTask = unprotectRenewTask(routineLoadTaskInfo);
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTaskInQueue(newTask);
}
Expand Down Expand Up @@ -1212,6 +1224,7 @@ private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskIn
} else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, txnStatusChangeReason)) {
// step2: update job progress
updateProgress(rlTaskTxnCommitAttachment);
routineLoadTaskInfo.selfAdaptTimeout(rlTaskTxnCommitAttachment);
}

if (rlTaskTxnCommitAttachment != null && !Strings.isNullOrEmpty(rlTaskTxnCommitAttachment.getErrorLogUrl())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,28 @@ public abstract class RoutineLoadTaskInfo {

protected boolean isMultiTable = false;

protected static final int MAX_TIMEOUT_BACK_OFF_COUNT = 3;
protected int timeoutBackOffCount = 0;

// this status will be set when corresponding transaction's status is changed.
// so that user or other logic can know the status of the corresponding txn.
protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;

public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs, boolean isMultiTable) {

public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs,
int timeoutBackOffCount, boolean isMultiTable) {
this.id = id;
this.jobId = jobId;
this.clusterName = clusterName;
this.createTimeMs = System.currentTimeMillis();
this.timeoutMs = timeoutMs;
this.timeoutBackOffCount = timeoutBackOffCount;
this.isMultiTable = isMultiTable;
}

public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs, long previousBeId,
boolean isMultiTable) {
this(id, jobId, clusterName, timeoutMs, isMultiTable);
public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs,
int timeoutBackOffCount, long previousBeId, boolean isMultiTable) {
this(id, jobId, clusterName, timeoutMs, timeoutBackOffCount, isMultiTable);
this.previousBeId = previousBeId;
}

Expand Down Expand Up @@ -136,6 +142,10 @@ public void setLastScheduledTime(long lastScheduledTime) {
this.lastScheduledTime = lastScheduledTime;
}

public void setTimeoutMs(long timeoutMs) {
this.timeoutMs = timeoutMs;
}

public long getTimeoutMs() {
return timeoutMs;
}
Expand All @@ -148,6 +158,14 @@ public TransactionStatus getTxnStatus() {
return txnStatus;
}

public void setTimeoutBackOffCount(int timeoutBackOffCount) {
this.timeoutBackOffCount = timeoutBackOffCount;
}

public int getTimeoutBackOffCount() {
return timeoutBackOffCount;
}

public boolean isTimeout() {
if (txnStatus == TransactionStatus.COMMITTED || txnStatus == TransactionStatus.VISIBLE) {
// the corresponding txn is already finished, this task can not be treated as timeout.
Expand All @@ -162,6 +180,21 @@ public boolean isTimeout() {
return false;
}

public void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
long taskExecutionTime = rlTaskTxnCommitAttachment.getTaskExecutionTimeMs();
long timeoutMs = this.timeoutMs;

while (this.timeoutBackOffCount > 0) {
timeoutMs = timeoutMs >> 1;
if (timeoutMs <= taskExecutionTime) {
this.timeoutMs = timeoutMs << 1;
return;
}
this.timeoutBackOffCount--;
}
this.timeoutMs = timeoutMs;
}

abstract TRoutineLoadTask createRoutineLoadTask() throws UserException;

// begin the txn of this task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void testProcessTimeOutTasks(@Injectable GlobalTransactionMgr globalTrans
Map<Integer, Long> partitionIdsToOffset = Maps.newHashMap();
partitionIdsToOffset.put(100, 0L);
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster",
maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false);
maxBatchIntervalS * 2 * 1000, 0, partitionIdsToOffset, false);
kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - maxBatchIntervalS * 2 * 1000 - 1);
routineLoadTaskInfoList.add(kafkaTaskInfo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1
Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", partitionIdToOffset);

Queue<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue();
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster", 20000,
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster", 20000, 0,
partitionIdToOffset, false);
routineLoadTaskInfoQueue.add(routineLoadTaskInfo1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "default_cluster", 20000,
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "default_cluster", 20000, 0,
partitionIdToOffset, false);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
Expand Down Expand Up @@ -390,7 +390,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", 20000,
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", 20000, 0,
partitionIdToOffset, false);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
Expand Down

0 comments on commit 01ff835

Please sign in to comment.