From f22b8715af8cca550770b3b3883816d68ce2d770 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Fri, 30 Aug 2024 15:03:24 +0800 Subject: [PATCH] [Feat](Job)After a job is paused, it can be manually triggered to execute. (#39565) ## Proposed changes - After a job is paused, it can be manually triggered to execute. - Update the return fields of the Insert task query to include a new start time field. (cherry picked from commit 55de454a0c9bac16bd01ec7ea2b0a09a0d9b3ec2) --- .../apache/doris/job/base/AbstractJob.java | 26 ++++++++++++++----- .../job/extensions/insert/InsertTask.java | 8 +++++- .../doris/job/scheduler/JobScheduler.java | 5 ++-- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 3f595d6daf5362..94a0b0146cd514 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -212,8 +212,9 @@ public List queryAllTasks() { } public List commonCreateTasks(TaskType taskType, C taskContext) { - if (!getJobStatus().equals(JobStatus.RUNNING)) { - log.warn("job is not running, job id is {}", jobId); + if (!canCreateTask(taskType)) { + log.info("job is not ready for scheduling, job id is {},job status is {}, taskType is {}", jobId, + jobStatus, taskType); return new ArrayList<>(); } if (!isReadyForScheduling(taskContext)) { @@ -235,6 +236,19 @@ public List commonCreateTasks(TaskType taskType, C taskContext) { } } + private boolean canCreateTask(TaskType taskType) { + JobStatus currentJobStatus = getJobStatus(); + + switch (taskType) { + case SCHEDULED: + return currentJobStatus.equals(JobStatus.RUNNING); + case MANUAL: + return currentJobStatus.equals(JobStatus.RUNNING) || currentJobStatus.equals(JobStatus.PAUSED); + default: + throw new IllegalArgumentException("Unsupported TaskType: " + taskType); + } + } + public void initTasks(Collection tasks, TaskType taskType) { tasks.forEach(task -> { task.setTaskType(taskType); @@ -307,7 +321,7 @@ public void logUpdateOperation() { @Override public void onTaskFail(T task) throws JobException { failedTaskCount.incrementAndGet(); - updateJobStatusIfEnd(false); + updateJobStatusIfEnd(false, task.getTaskType()); runningTasks.remove(task); logUpdateOperation(); } @@ -315,16 +329,16 @@ public void onTaskFail(T task) throws JobException { @Override public void onTaskSuccess(T task) throws JobException { succeedTaskCount.incrementAndGet(); - updateJobStatusIfEnd(true); + updateJobStatusIfEnd(true, task.getTaskType()); runningTasks.remove(task); logUpdateOperation(); } - private void updateJobStatusIfEnd(boolean taskSuccess) throws JobException { + private void updateJobStatusIfEnd(boolean taskSuccess, TaskType taskType) throws JobException { JobExecuteType executeType = getJobConfig().getExecuteType(); - if (executeType.equals(JobExecuteType.MANUAL)) { + if (executeType.equals(JobExecuteType.MANUAL) || taskType.equals(TaskType.MANUAL)) { return; } switch (executeType) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 0fe2a8364aa6d9..ee5abed8392e5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -58,6 +58,7 @@ public class InsertTask extends AbstractTask { new Column("Status", ScalarType.createStringType()), new Column("ErrorMsg", ScalarType.createStringType()), new Column("CreateTime", ScalarType.createStringType()), + new Column("StartTime", ScalarType.createStringType()), new Column("FinishTime", ScalarType.createStringType()), new Column("TrackingUrl", ScalarType.createStringType()), new Column("LoadStatistic", ScalarType.createStringType()), @@ -247,6 +248,8 @@ public TRow getTvfInfo(String jobName) { trow.addToColumnValue(new TCell().setStringVal(errorMsg)); // create time trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? "" + : TimeUtils.longToTimeString(getStartTimeMs()))); // load end time trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getFinishTimeMs()))); // tracking url @@ -274,7 +277,10 @@ private TRow getPendingTaskTVFInfo(String jobName) { trow.addToColumnValue(new TCell().setStringVal(getStatus().name())); trow.addToColumnValue(new TCell().setStringVal("")); trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs()))); - trow.addToColumnValue(new TCell().setStringVal("")); + trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? "" + : TimeUtils.longToTimeString(getStartTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(null == getFinishTimeMs() ? "" + : TimeUtils.longToTimeString(getFinishTimeMs()))); trow.addToColumnValue(new TCell().setStringVal("")); trow.addToColumnValue(new TCell().setStringVal("")); trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 2100511d22bd2f..5ba88c6e3ce960 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -188,10 +188,9 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() { clearEndJob(job); continue; } - if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) { - continue; + if (job.getJobStatus().equals(JobStatus.RUNNING) && job.getJobConfig().checkIsTimerJob()) { + cycleTimerJobScheduler(job, lastTimeWindowMs); } - cycleTimerJobScheduler(job, lastTimeWindowMs); } }