Skip to content

Commit

Permalink
Fix that manual registration of history jobs is deleted (DataLinkDC#2669
Browse files Browse the repository at this point in the history
)
  • Loading branch information
gaoyan1998 authored Dec 17, 2023
1 parent 2dd4143 commit 57bddf5
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 6 deletions.
13 changes: 11 additions & 2 deletions dinky-admin/src/main/java/org/dinky/job/ClearJobHistoryTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.dinky.context.SpringContextUtils;
import org.dinky.daemon.task.DaemonTask;
import org.dinky.daemon.task.DaemonTaskConfig;
import org.dinky.data.model.Configuration;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.job.handler.ClearJobHistoryHandler;
import org.dinky.service.ClusterInstanceService;
import org.dinky.service.HistoryService;
Expand All @@ -47,6 +49,9 @@ public class ClearJobHistoryTask implements DaemonTask {
private static final ClearJobHistoryHandler clearJobHistoryHandler;
private static final ClusterInstanceService clusterService;

private static Configuration<Integer> maxRetainDays;
private static Configuration<Integer> maxRetainCount;

static {
jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class);
jobHistoryService = SpringContextUtils.getBean("jobHistoryServiceImpl", JobHistoryService.class);
Expand All @@ -58,12 +63,16 @@ public class ClearJobHistoryTask implements DaemonTask {
.jobHistoryService(jobHistoryService)
.clusterService(clusterService)
.build();
maxRetainDays = SystemConfiguration.getInstances().getJobMaxRetainDays();
maxRetainCount = SystemConfiguration.getInstances().getJobMaxRetainCount();
}

@Override
public boolean dealTask() {
clearJobHistoryHandler.clearDinkyHistory(30, 20);
clearJobHistoryHandler.clearJobHistory(30, 20);
if (maxRetainCount.getValue() > 0) {
clearJobHistoryHandler.clearDinkyHistory(maxRetainDays.getValue(), maxRetainCount.getValue());
clearJobHistoryHandler.clearJobHistory(maxRetainDays.getValue(), maxRetainCount.getValue());
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.job.handler;

import org.dinky.data.model.ClusterInstance;
import org.dinky.data.model.job.History;
import org.dinky.data.model.job.JobInstance;
import org.dinky.service.ClusterInstanceService;
Expand Down Expand Up @@ -60,19 +61,42 @@ public void clearJobHistory(Integer maxRetainDays, Integer maxRetainCount) {
if (jobInstance.getCount() > maxRetainCount) {
// Create a query wrapper to delete job instances older than the maximum retain days
QueryWrapper<JobInstance> deleteWrapper = new QueryWrapper<>();
// Don't delete the last instance, keep it
List<JobInstance> reservedInstances = jobInstanceService
.lambdaQuery()
.eq(JobInstance::getTaskId, jobInstance.getTaskId())
.orderByDesc(JobInstance::getId)
.last("limit " + maxRetainCount)
.list();
deleteWrapper
.lambda()
.eq(JobInstance::getTaskId, jobInstance.getTaskId())
.lt(JobInstance::getCreateTime, LocalDateTime.now().minusDays(maxRetainDays));
.lt(JobInstance::getCreateTime, LocalDateTime.now().minusDays(maxRetainDays))
.notIn(
true,
JobInstance::getId,
reservedInstances.stream()
.map(JobInstance::getId)
.toArray());

// Retrieve the list of job instances to be deleted
List<JobInstance> deleteList = jobInstanceService.list(deleteWrapper);

List<Integer> historyDeleteIds =
deleteList.stream().map(JobInstance::getHistoryId).collect(Collectors.toList());

// Delete the cluster from the instance to be deleted, but filter the manually registered clusters
QueryWrapper<ClusterInstance> clusterDeleteWrapper = new QueryWrapper<>();
List<Integer> clusterDeleteIds =
deleteList.stream().map(JobInstance::getClusterId).collect(Collectors.toList());
clusterService.removeBatchByIds(clusterDeleteIds);
jobHistoryService.removeBatchByIds(historyDeleteIds);
clusterDeleteWrapper
.lambda()
.in(true, ClusterInstance::getId, clusterDeleteIds)
.eq(ClusterInstance::getAutoRegisters, true);

jobInstanceService.remove(deleteWrapper);
jobHistoryService.removeBatchByIds(historyDeleteIds);
clusterService.remove(clusterDeleteWrapper);
}
}
}
Expand All @@ -94,12 +118,22 @@ public void clearDinkyHistory(Integer maxRetainDays, Integer maxRetainCount) {
for (History history : historyList) {
// Check if the count exceeds the maximum retain count
if (history.getCount() > maxRetainCount) {
List<History> reservedHistorys = historyService
.lambdaQuery()
.eq(History::getTaskId, history.getTaskId())
.orderByDesc(History::getId)
.last("limit " + maxRetainCount)
.list();
// Create a query wrapper to delete history records older than the maximum retain days
QueryWrapper<History> deleteWrapper = new QueryWrapper<>();
deleteWrapper
.lambda()
.eq(History::getTaskId, history.getTaskId())
.lt(History::getStartTime, LocalDateTime.now().minusDays(maxRetainDays));
.lt(History::getStartTime, LocalDateTime.now().minusDays(maxRetainDays))
.notIn(
true,
History::getId,
reservedHistorys.stream().map(History::getId).toArray());
historyService.remove(deleteWrapper);
}
}
Expand Down
6 changes: 6 additions & 0 deletions dinky-common/src/main/java/org/dinky/data/enums/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,16 @@ public enum Status {
SYS_MAVEN_SETTINGS_REPOSITORYUSER_NOTE(111, "sys.maven.settings.repositoryUser.note"),
SYS_MAVEN_SETTINGS_REPOSITORYPASSWORD(112, "sys.maven.settings.repositoryPassword"),
SYS_MAVEN_SETTINGS_REPOSITORYPASSWORD_NOTE(113, "sys.maven.settings.repositoryPassword.note"),

SYS_ENV_SETTINGS_PYTHONHOME(114, "sys.env.settings.pythonHome"),
SYS_ENV_SETTINGS_PYTHONHOME_NOTE(115, "sys.env.settings.pythonHome.note"),
SYS_ENV_SETTINGS_DINKYADDR(116, "sys.env.settings.dinkyAddr"),
SYS_ENV_SETTINGS_DINKYADDR_NOTE(117, "sys.env.settings.dinkyAddr.note"),
SYS_ENV_SETTINGS_MAX_RETAIN_DAYS(1171, "sys.env.settings.maxRetainDays"),
SYS_ENV_SETTINGS_MAX_RETAIN_DAYS_NOTE(1172, "sys.env.settings.maxRetainDays.note"),
SYS_ENV_SETTINGS_MAX_RETAIN_COUNT(1173, "sys.env.settings.maxRetainCount"),
SYS_ENV_SETTINGS_MAX_RETAIN_COUNT_NOTE(1174, "sys.env.settings.maxRetainCount.note"),

SYS_DOLPHINSCHEDULER_SETTINGS_ENABLE(118, "sys.dolphinscheduler.settings.enable"),
SYS_DOLPHINSCHEDULER_SETTINGS_ENABLE_NOTE(119, "sys.dolphinscheduler.settings.enable.note"),
SYS_DOLPHINSCHEDULER_SETTINGS_URL(120, "sys.dolphinscheduler.settings.url"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ public static Configuration.OptionBuilder key(Status status) {
.defaultValue(System.getProperty("dinkyAddr"))
.note(Status.SYS_ENV_SETTINGS_DINKYADDR_NOTE);

private final Configuration<Integer> jobMaxRetainCount = key(Status.SYS_ENV_SETTINGS_MAX_RETAIN_COUNT)
.intType()
.defaultValue(10)
.note(Status.SYS_ENV_SETTINGS_MAX_RETAIN_COUNT_NOTE);

private final Configuration<Integer> jobMaxRetainDays = key(Status.SYS_ENV_SETTINGS_MAX_RETAIN_DAYS)
.intType()
.defaultValue(30)
.note(Status.SYS_ENV_SETTINGS_MAX_RETAIN_DAYS_NOTE);

private final Configuration<Boolean> dolphinschedulerEnable = key(Status.SYS_DOLPHINSCHEDULER_SETTINGS_ENABLE)
.booleanType()
.defaultValue(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ sys.env.settings.pythonHome=Python Env variables
sys.env.settings.pythonHome.note=Python environment variables, used to submit Python tasks and build Python Udf
sys.env.settings.dinkyAddr=Dinky Address
sys.env.settings.dinkyAddr.note=The address must be the same as the address configured in the Dinky Application background url
sys.env.settings.maxRetainDays=Job history max retained days
sys.env.settings.maxRetainDays.note=The maximum number of days for the history of submitted jobs and auto-registered cluster records to be retained will be automatically deleted when they expire
sys.env.settings.maxRetainCount=Job history max retained counts
sys.env.settings.maxRetainCount.note=The maximum number of submitted job histories and auto-registered cluster records will not be deleted if they are less than that number, even if the retention days have passed

sys.dolphinscheduler.settings.enable=Whether to enable DolphinScheduler
sys.dolphinscheduler.settings.enable.note=Whether to enable DolphinScheduler. Only after enabling it can you use the related functions of DolphinScheduler. Please fill in the following configuration items first, and then enable this configuration after completion. Also: Please ensure that the related configurations of DolphinScheduler are correct.
sys.dolphinscheduler.settings.url=DolphinScheduler address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ sys.env.settings.pythonHome=Python 环境变量
sys.env.settings.pythonHome.note=Python 环境变量,用于提交 Python 任务以及构建 Python Udf
sys.env.settings.dinkyAddr=Dinky 地址
sys.env.settings.dinkyAddr.note=该地址必须与Dinky Application后台url中配置的地址相同
sys.env.settings.maxRetainDays=作业历史最大保留天数
sys.env.settings.maxRetainDays.note=提交的作业历史与自动注册的集群记录最大保留天数,过期会被自动删除
sys.env.settings.maxRetainCount=作业历史最大保留数量
sys.env.settings.maxRetainCount.note=提交的作业历史与自动注册的集群记录最大保留数量,如果不足该数量,则不会被删除,即使已经过了做大保留天数

sys.dolphinscheduler.settings.enable=是否启用 DolphinScheduler
sys.dolphinscheduler.settings.enable.note=是否启用 DolphinScheduler ,启用后才能使用 DolphinScheduler 的相关功能,请先填写下列配置项,完成后再开启此项配置, 另:请确保 DolphinScheduler 的相关配置正确
sys.dolphinscheduler.settings.url=DolphinScheduler 地址
Expand Down

0 comments on commit 57bddf5

Please sign in to comment.