Skip to content

Commit

Permalink
[Fix-2672] [core] Fix the issue of cluster configuration deploy sessi…
Browse files Browse the repository at this point in the history
…on cluster
  • Loading branch information
aiwenmo committed Dec 17, 2023
1 parent 98891a5 commit 70d3dd8
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.job.handler;

import org.dinky.assertion.Asserts;
import org.dinky.data.model.ClusterInstance;
import org.dinky.data.model.job.History;
import org.dinky.data.model.job.JobInstance;
Expand Down Expand Up @@ -81,22 +82,25 @@ public void clearJobHistory(Integer maxRetainDays, Integer maxRetainCount) {

// Retrieve the list of job instances to be deleted
List<JobInstance> deleteList = jobInstanceService.list(deleteWrapper);

List<Integer> historyDeleteIds =
deleteList.stream().map(JobInstance::getHistoryId).collect(Collectors.toList());

// Delete the cluster from the instance to be deleted, but filter the manually registered clusters
QueryWrapper<ClusterInstance> clusterDeleteWrapper = new QueryWrapper<>();
List<Integer> clusterDeleteIds =
deleteList.stream().map(JobInstance::getClusterId).collect(Collectors.toList());
clusterDeleteWrapper
.lambda()
.in(true, ClusterInstance::getId, clusterDeleteIds)
.eq(ClusterInstance::getAutoRegisters, true);

jobInstanceService.remove(deleteWrapper);
jobHistoryService.removeBatchByIds(historyDeleteIds);
clusterService.remove(clusterDeleteWrapper);
if (Asserts.isNotNullCollection(deleteList)) {
jobInstanceService.remove(deleteWrapper);
}
if (Asserts.isNotNullCollection(historyDeleteIds)) {
jobHistoryService.removeBatchByIds(historyDeleteIds);
}
if (Asserts.isNotNullCollection(clusterDeleteIds)) {
// Delete the cluster from the instance to be deleted, but filter the manually registered clusters
QueryWrapper<ClusterInstance> clusterDeleteWrapper = new QueryWrapper<>();
clusterDeleteWrapper
.lambda()
.in(true, ClusterInstance::getId, clusterDeleteIds)
.eq(ClusterInstance::getAutoRegisters, true);
clusterService.remove(clusterDeleteWrapper);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ public ClusterInstance deploySessionCluster(Integer id) {
}
GatewayConfig gatewayConfig =
GatewayConfig.build(FlinkClusterConfig.create(clusterCfg.getType(), clusterCfg.getConfigJson()));
gatewayConfig.setType(gatewayConfig.getType().getSessionType());
GatewayResult gatewayResult = JobManager.deploySessionCluster(gatewayConfig);
return registersCluster(ClusterInstanceDTO.autoRegistersClusterDTO(
gatewayResult.getWebURL().replace("http://", ""),
Expand Down
1 change: 1 addition & 0 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ public ObjectNode getJarStreamGraphJson(String statement) {
@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
public JobResult executeJarSql(String statement) throws Exception {
job = Job.build(runMode, config, executorConfig, executor, statement, useGateway);
ready();
StreamGraph streamGraph =
JobJarStreamGraphBuilder.build(this).getJarStreamGraph(statement, getDinkyClassLoader());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ public static GatewayType get(String value) {
}

public static GatewayType getSessionType(String value) {
if (value.equals("Kubernetes")) {
if (value.contains("kubernetes")) {
return GatewayType.KUBERNETES_SESSION;
}
return GatewayType.YARN_SESSION;
}

public GatewayType getSessionType() {
if (longValue.contains("kubernetes")) {
return GatewayType.KUBERNETES_SESSION;
}
return GatewayType.YARN_SESSION;
Expand Down
1 change: 1 addition & 0 deletions dinky-web/src/locales/en-US/pages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ export default {
'pages.datastudio.editor.stop.job': 'Stop job',
'pages.datastudio.editor.stop.jobConfirm': 'Are you sure to stop the job [{jobName}]? ',
'pages.datastudio.editor.submitting': 'The new task [{jobName}] is executing',
'pages.datastudio.editor.checking': 'The task [{jobName}] is checking',
'pages.datastudio.editor.debugging': 'The new task [{jobName}] is debugging',
'pages.datastudio.editor.onlyread':
'Task has been published, modification is prohibited, please go offline first',
Expand Down
1 change: 1 addition & 0 deletions dinky-web/src/locales/zh-CN/pages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ export default {
'pages.datastudio.editor.stop.job': '停止作业',
'pages.datastudio.editor.stop.jobConfirm': '确定停止作业【{jobName}】吗?',
'pages.datastudio.editor.submitting': '新任务【{jobName}】正在执行',
'pages.datastudio.editor.checking': '任务【{jobName}】正在检查',
'pages.datastudio.editor.debugging': '新任务【{jobName}】正在调试',
'pages.datastudio.editor.onlyread': '任务已发布,禁止修改,请先下线任务',
'pages.datastudio.editor.notsave': '当前修改内容未保存!',
Expand Down

0 comments on commit 70d3dd8

Please sign in to comment.