Skip to content

Commit

Permalink
[Feat](Job)After a job is paused, it can be manually triggered to exe…
Browse files Browse the repository at this point in the history
…cute. (#39565)

## Proposed changes
- After a job is paused, it can be manually triggered to execute.
- Update the return fields of the Insert task query to include a new
start time field.

(cherry picked from commit 55de454)
  • Loading branch information
CalvinKirs committed Aug 30, 2024
1 parent 32a3280 commit f22b871
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,9 @@ public List<T> queryAllTasks() {
}

public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
if (!getJobStatus().equals(JobStatus.RUNNING)) {
log.warn("job is not running, job id is {}", jobId);
if (!canCreateTask(taskType)) {
log.info("job is not ready for scheduling, job id is {},job status is {}, taskType is {}", jobId,
jobStatus, taskType);
return new ArrayList<>();
}
if (!isReadyForScheduling(taskContext)) {
Expand All @@ -235,6 +236,19 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
}
}

private boolean canCreateTask(TaskType taskType) {
JobStatus currentJobStatus = getJobStatus();

switch (taskType) {
case SCHEDULED:
return currentJobStatus.equals(JobStatus.RUNNING);
case MANUAL:
return currentJobStatus.equals(JobStatus.RUNNING) || currentJobStatus.equals(JobStatus.PAUSED);
default:
throw new IllegalArgumentException("Unsupported TaskType: " + taskType);
}
}

public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
tasks.forEach(task -> {
task.setTaskType(taskType);
Expand Down Expand Up @@ -307,24 +321,24 @@ public void logUpdateOperation() {
@Override
public void onTaskFail(T task) throws JobException {
failedTaskCount.incrementAndGet();
updateJobStatusIfEnd(false);
updateJobStatusIfEnd(false, task.getTaskType());
runningTasks.remove(task);
logUpdateOperation();
}

@Override
public void onTaskSuccess(T task) throws JobException {
succeedTaskCount.incrementAndGet();
updateJobStatusIfEnd(true);
updateJobStatusIfEnd(true, task.getTaskType());
runningTasks.remove(task);
logUpdateOperation();

}


private void updateJobStatusIfEnd(boolean taskSuccess) throws JobException {
private void updateJobStatusIfEnd(boolean taskSuccess, TaskType taskType) throws JobException {
JobExecuteType executeType = getJobConfig().getExecuteType();
if (executeType.equals(JobExecuteType.MANUAL)) {
if (executeType.equals(JobExecuteType.MANUAL) || taskType.equals(TaskType.MANUAL)) {
return;
}
switch (executeType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class InsertTask extends AbstractTask {
new Column("Status", ScalarType.createStringType()),
new Column("ErrorMsg", ScalarType.createStringType()),
new Column("CreateTime", ScalarType.createStringType()),
new Column("StartTime", ScalarType.createStringType()),
new Column("FinishTime", ScalarType.createStringType()),
new Column("TrackingUrl", ScalarType.createStringType()),
new Column("LoadStatistic", ScalarType.createStringType()),
Expand Down Expand Up @@ -247,6 +248,8 @@ public TRow getTvfInfo(String jobName) {
trow.addToColumnValue(new TCell().setStringVal(errorMsg));
// create time
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? ""
: TimeUtils.longToTimeString(getStartTimeMs())));
// load end time
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getFinishTimeMs())));
// tracking url
Expand Down Expand Up @@ -274,7 +277,10 @@ private TRow getPendingTaskTVFInfo(String jobName) {
trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? ""
: TimeUtils.longToTimeString(getStartTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(null == getFinishTimeMs() ? ""
: TimeUtils.longToTimeString(getFinishTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,9 @@ private void executeTimerJobIdsWithinLastTenMinutesWindow() {
clearEndJob(job);
continue;
}
if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) {
continue;
if (job.getJobStatus().equals(JobStatus.RUNNING) && job.getJobConfig().checkIsTimerJob()) {
cycleTimerJobScheduler(job, lastTimeWindowMs);
}
cycleTimerJobScheduler(job, lastTimeWindowMs);
}
}

Expand Down

0 comments on commit f22b871

Please sign in to comment.