From 70d3dd829de8f1dd22e0c9d303118049eb49419d Mon Sep 17 00:00:00 2001 From: wenmo <32723967+wenmo@users.noreply.github.com> Date: Sun, 17 Dec 2023 22:11:47 +0800 Subject: [PATCH] [Fix-2672] [core] Fix the issue of cluster configuration deploy session cluster --- .../job/handler/ClearJobHistoryHandler.java | 28 +++++++++++-------- .../impl/ClusterInstanceServiceImpl.java | 1 + .../main/java/org/dinky/job/JobManager.java | 1 + .../org/dinky/gateway/enums/GatewayType.java | 9 +++++- dinky-web/src/locales/en-US/pages.ts | 1 + dinky-web/src/locales/zh-CN/pages.ts | 1 + 6 files changed, 28 insertions(+), 13 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/ClearJobHistoryHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/ClearJobHistoryHandler.java index 0003bdd5fe..d11eaad213 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/ClearJobHistoryHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/ClearJobHistoryHandler.java @@ -19,6 +19,7 @@ package org.dinky.job.handler; +import org.dinky.assertion.Asserts; import org.dinky.data.model.ClusterInstance; import org.dinky.data.model.job.History; import org.dinky.data.model.job.JobInstance; @@ -81,22 +82,25 @@ public void clearJobHistory(Integer maxRetainDays, Integer maxRetainCount) { // Retrieve the list of job instances to be deleted List deleteList = jobInstanceService.list(deleteWrapper); - List historyDeleteIds = deleteList.stream().map(JobInstance::getHistoryId).collect(Collectors.toList()); - - // Delete the cluster from the instance to be deleted, but filter the manually registered clusters - QueryWrapper clusterDeleteWrapper = new QueryWrapper<>(); List clusterDeleteIds = deleteList.stream().map(JobInstance::getClusterId).collect(Collectors.toList()); - clusterDeleteWrapper - .lambda() - .in(true, ClusterInstance::getId, clusterDeleteIds) - .eq(ClusterInstance::getAutoRegisters, true); - - jobInstanceService.remove(deleteWrapper); - jobHistoryService.removeBatchByIds(historyDeleteIds); - clusterService.remove(clusterDeleteWrapper); + if (Asserts.isNotNullCollection(deleteList)) { + jobInstanceService.remove(deleteWrapper); + } + if (Asserts.isNotNullCollection(historyDeleteIds)) { + jobHistoryService.removeBatchByIds(historyDeleteIds); + } + if (Asserts.isNotNullCollection(clusterDeleteIds)) { + // Delete the cluster from the instance to be deleted, but filter the manually registered clusters + QueryWrapper clusterDeleteWrapper = new QueryWrapper<>(); + clusterDeleteWrapper + .lambda() + .in(true, ClusterInstance::getId, clusterDeleteIds) + .eq(ClusterInstance::getAutoRegisters, true); + clusterService.remove(clusterDeleteWrapper); + } } } } diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java index 4b1e0cd624..ecd788764e 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/ClusterInstanceServiceImpl.java @@ -201,6 +201,7 @@ public ClusterInstance deploySessionCluster(Integer id) { } GatewayConfig gatewayConfig = GatewayConfig.build(FlinkClusterConfig.create(clusterCfg.getType(), clusterCfg.getConfigJson())); + gatewayConfig.setType(gatewayConfig.getType().getSessionType()); GatewayResult gatewayResult = JobManager.deploySessionCluster(gatewayConfig); return registersCluster(ClusterInstanceDTO.autoRegistersClusterDTO( gatewayResult.getWebURL().replace("http://", ""), diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index c5825dd0d9..8163dc552a 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -240,6 +240,7 @@ public ObjectNode getJarStreamGraphJson(String statement) { @ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE) public JobResult executeJarSql(String statement) throws Exception { job = Job.build(runMode, config, executorConfig, executor, statement, useGateway); + ready(); StreamGraph streamGraph = JobJarStreamGraphBuilder.build(this).getJarStreamGraph(statement, getDinkyClassLoader()); try { diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/enums/GatewayType.java b/dinky-gateway/src/main/java/org/dinky/gateway/enums/GatewayType.java index 7789eaf01c..e207d94af7 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/enums/GatewayType.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/enums/GatewayType.java @@ -62,7 +62,14 @@ public static GatewayType get(String value) { } public static GatewayType getSessionType(String value) { - if (value.equals("Kubernetes")) { + if (value.contains("kubernetes")) { + return GatewayType.KUBERNETES_SESSION; + } + return GatewayType.YARN_SESSION; + } + + public GatewayType getSessionType() { + if (longValue.contains("kubernetes")) { return GatewayType.KUBERNETES_SESSION; } return GatewayType.YARN_SESSION; diff --git a/dinky-web/src/locales/en-US/pages.ts b/dinky-web/src/locales/en-US/pages.ts index 90c39948db..9769feecf8 100644 --- a/dinky-web/src/locales/en-US/pages.ts +++ b/dinky-web/src/locales/en-US/pages.ts @@ -351,6 +351,7 @@ export default { 'pages.datastudio.editor.stop.job': 'Stop job', 'pages.datastudio.editor.stop.jobConfirm': 'Are you sure to stop the job [{jobName}]? ', 'pages.datastudio.editor.submitting': 'The new task [{jobName}] is executing', + 'pages.datastudio.editor.checking': 'The task [{jobName}] is checking', 'pages.datastudio.editor.debugging': 'The new task [{jobName}] is debugging', 'pages.datastudio.editor.onlyread': 'Task has been published, modification is prohibited, please go offline first', diff --git a/dinky-web/src/locales/zh-CN/pages.ts b/dinky-web/src/locales/zh-CN/pages.ts index 1c0dda217a..5417757669 100644 --- a/dinky-web/src/locales/zh-CN/pages.ts +++ b/dinky-web/src/locales/zh-CN/pages.ts @@ -341,6 +341,7 @@ export default { 'pages.datastudio.editor.stop.job': '停止作业', 'pages.datastudio.editor.stop.jobConfirm': '确定停止作业【{jobName}】吗?', 'pages.datastudio.editor.submitting': '新任务【{jobName}】正在执行', + 'pages.datastudio.editor.checking': '任务【{jobName}】正在检查', 'pages.datastudio.editor.debugging': '新任务【{jobName}】正在调试', 'pages.datastudio.editor.onlyread': '任务已发布,禁止修改,请先下线任务', 'pages.datastudio.editor.notsave': '当前修改内容未保存!',