Skip to content

Commit

Permalink
fix timeout backoff can not work
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui committed Mar 21, 2024
1 parent c8a598b commit 9b07bce
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserExcept
((KafkaProgress) progress).getOffsetByPartition(kafkaPartition));
}
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, clusterName,
maxBatchIntervalS * 2 * 1000, taskKafkaProgress, isMultiTable());
maxBatchIntervalS * 2 * 1000, 0, taskKafkaProgress, isMultiTable());
routineLoadTaskInfoList.add(kafkaTaskInfo);
result.add(kafkaTaskInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
private Map<Integer, Long> partitionIdToOffset;

public KafkaTaskInfo(UUID id, long jobId, String clusterName,
long timeoutMs, Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) {
super(id, jobId, clusterName, timeoutMs, isMultiTable);
long timeoutMs, int timeoutBackOffCount,
Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) {
super(id, jobId, timeoutMs, timeoutBackOffCount, isMultiTable);
this.partitionIdToOffset = partitionIdToOffset;
}

public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) {
super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), kafkaTaskInfo.getClusterName(),
kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), isMultiTable);
kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getTimeoutBackOffCount(),
kafkaTaskInfo.getBeId(), isMultiTable);
this.partitionIdToOffset = partitionIdToOffset;
}

Expand Down Expand Up @@ -129,6 +131,10 @@ private TExecPlanFragmentParams rePlan(RoutineLoadJob routineLoadJob) throws Use
TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
long timeoutS = this.getTimeoutMs() / 1000;
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS);
tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS);
return tExecPlanFragmentParams;
}

Expand All @@ -138,6 +144,10 @@ private TPipelineFragmentParams rePlanForPipeline(RoutineLoadJob routineLoadJob)
TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.planForPipeline(loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
long timeoutS = this.getTimeoutMs() / 1000;
tPlanFragment.getOutputSink().getOlapTableSink().setLoadChannelTimeoutS(timeoutS);
tExecPlanFragmentParams.getQueryOptions().setQueryTimeout((int) timeoutS);
tExecPlanFragmentParams.getQueryOptions().setExecutionTimeout((int) timeoutS);
return tExecPlanFragmentParams;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,21 @@ public abstract class RoutineLoadTaskInfo {
// so that user or other logic can know the status of the corresponding txn.
protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;

public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs, boolean isMultiTable) {

public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs,
int timeoutBackOffCount, boolean isMultiTable) {
this.id = id;
this.jobId = jobId;
this.clusterName = clusterName;
this.createTimeMs = System.currentTimeMillis();
this.timeoutMs = timeoutMs;
this.timeoutBackOffCount = timeoutBackOffCount;
this.isMultiTable = isMultiTable;
}

public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs, long previousBeId,
boolean isMultiTable) {
this(id, jobId, clusterName, timeoutMs, isMultiTable);
public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs,
int timeoutBackOffCount, long previousBeId, boolean isMultiTable) {
this(id, jobId, timeoutMs, clusterName, timeoutBackOffCount, isMultiTable);
this.previousBeId = previousBeId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void testProcessTimeOutTasks(@Injectable GlobalTransactionMgr globalTrans
Map<Integer, Long> partitionIdsToOffset = Maps.newHashMap();
partitionIdsToOffset.put(100, 0L);
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster",
maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false);
maxBatchIntervalS * 2 * 1000, 0, partitionIdsToOffset, false);
kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - maxBatchIntervalS * 2 * 1000 - 1);
routineLoadTaskInfoList.add(kafkaTaskInfo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1
Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", partitionIdToOffset);

Queue<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue();
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster", 20000,
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster", 20000, 0,
partitionIdToOffset, false);
routineLoadTaskInfoQueue.add(routineLoadTaskInfo1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "default_cluster", 20000,
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "default_cluster", 20000, 0,
partitionIdToOffset, false);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
Expand Down Expand Up @@ -390,7 +390,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", 20000,
KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", 20000, 0,
partitionIdToOffset, false);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
Expand Down

0 comments on commit 9b07bce

Please sign in to comment.