Skip to content

Commit

Permalink
[Fix-2108][admin] Fix updating the savepoint policy of the job during…
Browse files Browse the repository at this point in the history
… checkpoint recovery (DataLinkDC#2247)

* [Fix-2108][admin] Fix the issue of updating the savepoint policy of the job during checkpoint recovery

* fix org.apache.commons.lang.SerializationUtils

---------

Co-authored-by: wenmo <[email protected]>
  • Loading branch information
aiwenmo and aiwenmo authored Aug 20, 2023
1 parent f5244d5 commit 327a90b
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import com.dlink.utils.UDFUtils;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
Expand Down Expand Up @@ -316,7 +317,6 @@ public JobResult restartTask(Integer id, String savePointPath) {
} else {
task.setSavePointStrategy(SavePointStrategy.CUSTOM.getValue());
task.setSavePointPath(savePointPath);
updateById(task);
}
JobConfig config = buildJobConfig(task);
JobManager jobManager = JobManager.build(config);
Expand Down Expand Up @@ -773,15 +773,16 @@ public Result onLineTask(Integer id) {
@Override
public Result reOnLineTask(Integer id, String savePointPath) {
final Task task = this.getTaskInfoById(id);
final Task taskForConfig = (Task) SerializationUtils.clone(task);
Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
if (Asserts.isNotNull(task.getJobInstanceId()) && task.getJobInstanceId() != 0) {
savepointJobInstance(task.getJobInstanceId(), SavePointType.CANCEL.getValue());
}
if (StringUtils.isNotBlank(savePointPath)) {
task.setSavePointStrategy(SavePointStrategy.CUSTOM.getValue());
task.setSavePointPath(savePointPath);
taskForConfig.setSavePointStrategy(SavePointStrategy.CUSTOM.getValue());
taskForConfig.setSavePointPath(savePointPath);
}
final JobResult jobResult = submitTaskToOnline(task, id);
final JobResult jobResult = submitTaskToOnline(taskForConfig, id);
if (Job.JobStatus.SUCCESS == jobResult.getStatus()) {
task.setStep(JobLifeCycle.ONLINE.getValue());
task.setJobInstanceId(jobResult.getJobInstanceId());
Expand Down

0 comments on commit 327a90b

Please sign in to comment.