Skip to content

Commit

Permalink
[feat](job)Internal job cancellation immediately and the strong assoc…
Browse files Browse the repository at this point in the history
…iation with the STARTS parameter (#36805) (#38110)

…

## Proposed changes

For internal tasks, such as MTMV, the start time may already be set, or
the time may be adjusted immediately.

<!--Describe your changes.-->

(cherry picked from commit 904a6c0)

## Proposed changes

Issue Number: close #36805

<!--Describe your changes.-->
  • Loading branch information
CalvinKirs authored Sep 5, 2024
1 parent 52393f8 commit 961d2c9
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public void analyze(Analyzer analyzer) throws UserException {
if (null != onceJobStartTimestamp) {
if (onceJobStartTimestamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) {
jobExecutionConfiguration.setImmediate(true);
timerDefinition.setStartTimeMs(System.currentTimeMillis());
} else {
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp));
}
Expand All @@ -149,6 +150,8 @@ public void analyze(Analyzer analyzer) throws UserException {
if (null != startsTimeStamp) {
if (startsTimeStamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) {
jobExecutionConfiguration.setImmediate(true);
//To avoid immediate re-scheduling, set the start time of the timer 100ms before the current time.
timerDefinition.setStartTimeMs(System.currentTimeMillis());
} else {
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ public void checkParams() {
if (executeType == JobExecuteType.INSTANT || executeType == JobExecuteType.MANUAL) {
return;
}

checkTimerDefinition(immediate);

checkTimerDefinition();
if (executeType == JobExecuteType.ONE_TIME) {
validateStartTimeMs();
return;
Expand All @@ -80,12 +78,12 @@ public void checkParams() {
}
}

private void checkTimerDefinition(boolean immediate) {
private void checkTimerDefinition() {
if (timerDefinition == null) {
throw new IllegalArgumentException(
"timerDefinition cannot be null when executeType is not instant or manual");
}
timerDefinition.checkParams(immediate);
timerDefinition.checkParams();
}

private void validateStartTimeMs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,7 @@ public class TimerDefinition {
private Long latestSchedulerTimeMs;


public void checkParams(boolean immediate) {
if (null != startTimeMs && immediate) {
throw new IllegalArgumentException("startTimeMs must be null when immediate is true");
}
if (null == startTimeMs && immediate) {
startTimeMs = System.currentTimeMillis();
}
public void checkParams() {
if (null == startTimeMs) {
startTimeMs = System.currentTimeMillis() + intervalUnit.getIntervalMs(interval);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ public void scheduleOneJob(T job) throws JobException {
schedulerInstantJob(job, TaskType.SCHEDULED, null);
}
}
if (job.getJobConfig().isImmediate() && JobExecuteType.ONE_TIME.equals(job.getJobConfig().getExecuteType())) {
schedulerInstantJob(job, TaskType.SCHEDULED, null);
return;
}
//RECURRING job and immediate is true
if (job.getJobConfig().isImmediate()) {
job.getJobConfig().getTimerDefinition().setLatestSchedulerTimeMs(System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public void testImmediate() {
JobExecutionConfiguration configuration = new JobExecutionConfiguration();
configuration.setExecuteType(JobExecuteType.ONE_TIME);
configuration.setImmediate(true);
configuration.setImmediate(true);
TimerDefinition timerDefinition = new TimerDefinition();
timerDefinition.setStartTimeMs(0L);
configuration.setTimerDefinition(timerDefinition);
configuration.checkParams();
}
Expand Down

0 comments on commit 961d2c9

Please sign in to comment.