diff --git a/pom.xml b/pom.xml
index 331f7d2859..53c7358694 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,8 +91,8 @@
1.3.0
3.2.9
_${scala.binary.version}
-
- 1.14.4
+ 3.2.0-1.18
+ 1.18.1
1.8.1
1.0.0
1.14
@@ -139,7 +139,6 @@
10.0.2
3.3.0
org.apache.streampark.shaded
- flink-table-uber_${scala.binary.version}
5.1
1.18.24
5.9.1
diff --git a/streampark-console/streampark-console-service/pom.xml b/streampark-console/streampark-console-service/pom.xml
index ec4d16044f..394f9b1f74 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -449,6 +449,13 @@
${flink.version}
+
+ org.apache.streampark
+ streampark-flink-connector-plugin
+ ${project.version}
+ provided
+
+
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
index ee136c1b19..e356ad3f29 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java
@@ -24,11 +24,13 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.module.scala.DefaultScalaModule;
import java.io.IOException;
import java.text.SimpleDateFormat;
+import java.util.Map;
/** Serialization utils */
public final class JacksonUtils {
@@ -41,6 +43,7 @@ private JacksonUtils() {
static {
MAPPER = new ObjectMapper();
MAPPER.registerModule(new DefaultScalaModule());
+ MAPPER.setPropertyNamingStrategy(PropertyNamingStrategies.KEBAB_CASE);
MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
@@ -67,4 +70,8 @@ public static boolean isValidJson(String jsonStr) {
return false;
}
}
+
+ public static Map toMap(String jsonStr) throws JsonProcessingException {
+ return (Map) MAPPER.readValue(jsonStr, Map.class);
+ }
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
index 9a2a4a8de7..57d8b53419 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
@@ -87,4 +87,7 @@ public static File getAppClientDir() {
return getAppDir(CLIENT);
}
+ public static File getPluginDir() {
+ return getAppDir(PLUGINS);
+ }
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/DatabaseParam.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/DatabaseParam.java
new file mode 100644
index 0000000000..8b7e82dd25
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/DatabaseParam.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.bean;
+
+import lombok.Data;
+
+import javax.validation.constraints.NotBlank;
+
+@Data
+public class DatabaseParam {
+
+ @NotBlank(message = "invalid.databaseName")
+ private String name;
+
+ @NotBlank(message = "invalid.catalogId")
+ private Long catalogId;
+
+ private String catalogName;
+ private boolean ignoreIfExits;
+ private boolean cascade;
+ private String description;
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/DatabaseController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/DatabaseController.java
index f87781acdd..c4c4e9cebf 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/DatabaseController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/DatabaseController.java
@@ -1,12 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.streampark.console.core.controller;
+import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.domain.RestResponse;
+import org.apache.streampark.console.core.bean.DatabaseParam;
+import org.apache.streampark.console.core.service.DatabaseService;
+
+import org.apache.shiro.authz.annotation.RequiresPermissions;
+
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+import java.io.IOException;
+import java.util.List;
+
@Slf4j
@Validated
@RestController
@RequestMapping("flink/database")
-public class DatabaseController {}
+public class DatabaseController {
+
+ @Autowired
+ DatabaseService databaseService;
+
+ @PostMapping("create")
+ @RequiresPermissions("database:create")
+ public RestResponse create(DatabaseParam databaseParam) throws IOException {
+ boolean saved = databaseService.createDatabase(databaseParam);
+ return RestResponse.success(saved);
+ }
+
+ @PostMapping("list")
+ @RequiresPermissions("database:view")
+ public RestResponse list(DatabaseParam databaseParam, RestRequest request) {
+ List databaseParamList =
+ databaseService.listDatabases(databaseParam.getCatalogId());
+ return RestResponse.success(databaseParamList);
+ }
+
+ @PostMapping("delete")
+ @RequiresPermissions("database:delete")
+ public RestResponse remove(DatabaseParam databaseParam) {
+ boolean deleted = databaseService.dropDatabase(databaseParam);
+ return RestResponse.success(deleted);
+ }
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Database.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Database.java
new file mode 100644
index 0000000000..fcd507c3dd
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Database.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class Database {
+
+ private String name;
+
+ private Integer catalogId;
+
+ private String catalogName;
+
+ private String description;
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java
index 7053bee5bc..2914fd5e88 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java
@@ -30,5 +30,7 @@ public interface CatalogMapper extends BaseMapper {
boolean existsByCatalogName(@Param("catalogName") String catalogName);
+ FlinkCatalog selectByCatalogName(@Param("catalogName") String catalogName);
+
IPage selectPage(Page page, @Param("catalog") FlinkCatalog catalog);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DatabaseMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DatabaseMapper.java
new file mode 100644
index 0000000000..d8d26eb061
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DatabaseMapper.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.mapper;
+
+import org.apache.streampark.console.core.entity.Database;
+
+import org.apache.ibatis.annotations.Mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+@Mapper
+public interface DatabaseMapper extends BaseMapper {
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
index c19d467b41..ab43582ca0 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java
@@ -49,6 +49,9 @@ public interface CatalogService extends IService {
*/
IPage page(FlinkCatalogParams catalog, RestRequest request);
+ FlinkCatalog getCatalog(Long catalogId);
+
+ FlinkCatalog getCatalog(String catalogName);
/**
* update Catalog
*
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DatabaseService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DatabaseService.java
new file mode 100644
index 0000000000..0a8c749c86
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DatabaseService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.core.bean.DatabaseParam;
+import org.apache.streampark.console.core.entity.Database;
+
+import com.baomidou.mybatisplus.extension.service.IService;
+
+import java.util.List;
+
+public interface DatabaseService extends IService {
+
+ /**
+ * Checks if the specified database exists.
+ *
+ * @param databaseParam The database to check
+ * @return true if the database exists, false otherwise
+ */
+ boolean databaseExists(DatabaseParam databaseParam);
+
+ /**
+ * Creates a new database given {@link Database}.
+ *
+ * @param databaseParam The {@link DatabaseParam} object that contains the detail of the created
+ * database
+ * @return true if the operation is successful, false otherwise
+ */
+ boolean createDatabase(DatabaseParam databaseParam);
+
+ /**
+ * Lists databases given catalog id.
+ *
+ * @return The list of databases of given catalog
+ */
+ List listDatabases(Long catalogId);
+
+ /**
+ * Drops database given database name.
+ *
+ * @param databaseParam The dropping database
+ * @return true if the operation is successful, false otherwise
+ */
+ boolean dropDatabase(DatabaseParam databaseParam);
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java
new file mode 100644
index 0000000000..5f75925885
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.core.util.CatalogServiceUtils;
+
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+@Service
+public class FlinkCatalogService {
+
+ @Value("${table.catalog-store.kind}")
+ private String storeKind;
+
+ @Value("${table.catalog-store.jdbc.url}")
+ private String jdbcUrl;
+
+ @Value("${table.catalog-store.jdbc.driver}")
+ private String jdbcDriver;
+
+ @Value("${table.catalog-store.jdbc.username}")
+ private String jdbcUserName;
+
+ @Value("${table.catalog-store.jdbc.password}")
+ private String jdbcPassword;
+
+ @Value("${table.catalog-store.jdbc.max-retry-timeout")
+ private String jdbcMaxRetryTimeout;
+
+ private Map catalogMap = new ConcurrentHashMap<>(0);
+
+ public List listDatabases(String catalogName, Map options) {
+ Catalog catalog = getCatalog(catalogName, options);
+ return catalog.listDatabases();
+ }
+
+ public boolean databaseExists(String catalogName, Map options, String database) {
+ Catalog catalog = getCatalog(catalogName, options);
+ return catalog.databaseExists(database);
+ }
+
+ public boolean createDatabase(
+ String catalogName,
+ Map options,
+ String databaseName,
+ boolean ignoreIfExists) {
+ Catalog catalog = getCatalog(catalogName, options);
+ try {
+ catalog.createDatabase(databaseName, null, ignoreIfExists);
+ return true;
+ } catch (CatalogException | DatabaseAlreadyExistException e) {
+ log.error("create database {} failed.", databaseName, e);
+ throw new CatalogException(
+ String.format("The database '%s' already exists in the catalog.", databaseName));
+ }
+ }
+
+ public void dropDatabase(
+ String catalogName,
+ Map options,
+ String databaseName,
+ boolean ignoreIfExists) {
+ Catalog catalog = getCatalog(catalogName, options);
+ try {
+ catalog.dropDatabase(databaseName, ignoreIfExists, true);
+ } catch (CatalogException | DatabaseNotEmptyException | DatabaseNotExistException e) {
+ log.error("Drop database {} failed.", databaseName, e);
+ throw new CatalogException(
+ String.format("The database '%s' already exists in the catalog.", databaseName));
+ }
+ }
+
+ private Catalog getCatalog(String catalogName, Map options) {
+ if (catalogMap.containsKey(catalogName)) {
+ return catalogMap.get(catalogName);
+ } else {
+ Map configuration = new HashMap<>();
+ configuration.put(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND.key(), storeKind);
+ configuration.put("table.catalog-store.jdbc.url", jdbcUrl);
+ configuration.put("table.catalog-store.jdbc.driver", jdbcDriver);
+ configuration.put("table.catalog-store.jdbc.username", jdbcUserName);
+ configuration.put("table.catalog-store.jdbc.password", jdbcPassword);
+ configuration.put("table.catalog-store.jdbc.max-retry-timeout", jdbcMaxRetryTimeout);
+ Catalog catalog = CatalogServiceUtils.getCatalog(catalogName, options, configuration);
+ catalog.open();
+ catalogMap.put(catalogName, catalog);
+ return catalog;
+ }
+ }
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index e19976dd4d..c9bfa53217 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -89,6 +89,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -98,7 +99,6 @@
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Sets;
-import io.fabric8.kubernetes.client.KubernetesClientException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@@ -181,9 +181,11 @@ public class ApplicationActionServiceImpl extends ServiceImpl> startFutureMap = new ConcurrentHashMap<>();
+ private final Map> startFutureMap =
+ new ConcurrentHashMap<>();
- private final Map> cancelFutureMap = new ConcurrentHashMap<>();
+ private final Map> cancelFutureMap =
+ new ConcurrentHashMap<>();
@Override
public void revoke(Long appId) throws ApplicationException {
@@ -292,26 +294,28 @@ public void cancel(Application appParam) throws Exception {
properties.put(RestOptions.PORT.key(), activeAddress.getPort());
}
- Tuple3 clusterIdNamespace = getNamespaceClusterId(application);
+ Tuple3 clusterIdNamespace =
+ getNamespaceClusterId(application);
String namespace = clusterIdNamespace.t1;
String clusterId = clusterIdNamespace.t2;
- CancelRequest cancelRequest = new CancelRequest(
- application.getId(),
- flinkEnv.getFlinkVersion(),
- FlinkExecutionMode.of(application.getExecutionMode()),
- properties,
- clusterId,
- application.getJobId(),
- appParam.getRestoreOrTriggerSavepoint(),
- appParam.getDrain(),
- customSavepoint,
- appParam.getNativeFormat(),
- namespace);
+ CancelRequest cancelRequest =
+ new CancelRequest(
+ application.getId(),
+ flinkEnv.getFlinkVersion(),
+ FlinkExecutionMode.of(application.getExecutionMode()),
+ properties,
+ clusterId,
+ application.getJobId(),
+ appParam.getRestoreOrTriggerSavepoint(),
+ appParam.getDrain(),
+ customSavepoint,
+ appParam.getNativeFormat(),
+ namespace);
final Date triggerTime = new Date();
- CompletableFuture cancelFuture = CompletableFuture
- .supplyAsync(() -> FlinkClient.cancel(cancelRequest), executorService);
+ CompletableFuture cancelFuture =
+ CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest), executorService);
cancelFutureMap.put(application.getId(), cancelFuture);
@@ -431,41 +435,45 @@ public void start(Application appParam, boolean auto) throws Exception {
}
// Get the args after placeholder replacement
- String args = StringUtils.isBlank(appParam.getArgs()) ? application.getArgs() : appParam.getArgs();
+ String args =
+ StringUtils.isBlank(appParam.getArgs()) ? application.getArgs() : appParam.getArgs();
String applicationArgs = variableService.replaceVariable(application.getTeamId(), args);
- Tuple3 clusterIdNamespace = getNamespaceClusterId(application);
+ Tuple3 clusterIdNamespace =
+ getNamespaceClusterId(application);
String k8sNamespace = clusterIdNamespace.t1;
String k8sClusterId = clusterIdNamespace.t2;
FlinkK8sRestExposedType exposedType = clusterIdNamespace.t3;
String dynamicProperties =
- StringUtils.isBlank(appParam.getDynamicProperties()) ? application.getDynamicProperties()
+ StringUtils.isBlank(appParam.getDynamicProperties())
+ ? application.getDynamicProperties()
: appParam.getDynamicProperties();
- SubmitRequest submitRequest = new SubmitRequest(
- flinkEnv.getFlinkVersion(),
- FlinkExecutionMode.of(application.getExecutionMode()),
- getProperties(application, dynamicProperties),
- flinkEnv.getFlinkConf(),
- FlinkDevelopmentMode.of(application.getJobType()),
- application.getId(),
- new JobID().toHexString(),
- application.getJobName(),
- appConf,
- application.getApplicationType(),
- getSavepointPath(appParam),
- FlinkRestoreMode.of(appParam.getRestoreMode()),
- applicationArgs,
- k8sClusterId,
- application.getHadoopUser(),
- buildResult,
- extraParameter,
- k8sNamespace,
- exposedType);
-
- CompletableFuture future = CompletableFuture
- .supplyAsync(() -> FlinkClient.submit(submitRequest), executorService);
+ SubmitRequest submitRequest =
+ new SubmitRequest(
+ flinkEnv.getFlinkVersion(),
+ FlinkExecutionMode.of(application.getExecutionMode()),
+ getProperties(application, dynamicProperties),
+ flinkEnv.getFlinkConf(),
+ FlinkDevelopmentMode.of(application.getJobType()),
+ application.getId(),
+ new JobID().toHexString(),
+ application.getJobName(),
+ appConf,
+ application.getApplicationType(),
+ getSavepointPath(appParam),
+ FlinkRestoreMode.of(appParam.getRestoreMode()),
+ applicationArgs,
+ k8sClusterId,
+ application.getHadoopUser(),
+ buildResult,
+ extraParameter,
+ k8sNamespace,
+ exposedType);
+
+ CompletableFuture future =
+ CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), executorService);
startFutureMap.put(application.getId(), future);
@@ -594,10 +602,11 @@ private void processForException(
private boolean checkAppRepeatInYarn(String jobName) {
try {
YarnClient yarnClient = HadoopUtils.yarnClient();
- Set types = Sets.newHashSet(
- ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName());
- EnumSet states = EnumSet.of(YarnApplicationState.RUNNING,
- YarnApplicationState.ACCEPTED);
+ Set types =
+ Sets.newHashSet(
+ ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName());
+ EnumSet states =
+ EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.ACCEPTED);
List applications = yarnClient.getApplications(types, states);
for (ApplicationReport report : applications) {
if (report.getName().equals(jobName)) {
@@ -633,9 +642,10 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, Applicati
// 1) dist_userJar
String sqlDistJar = ServiceHelper.getFlinkSqlClientJar(flinkEnv);
// 2) appConfig
- appConf = applicationConfig == null
- ? null
- : String.format("yaml://%s", applicationConfig.getContent());
+ appConf =
+ applicationConfig == null
+ ? null
+ : String.format("yaml://%s", applicationConfig.getContent());
// 3) client
if (FlinkExecutionMode.YARN_APPLICATION == executionModeEnum) {
String clientPath = Workspace.remote().APP_CLIENT();
@@ -644,7 +654,8 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, Applicati
break;
case PYFLINK:
- Resource resource = resourceService.findByResourceName(application.getTeamId(), application.getJar());
+ Resource resource =
+ resourceService.findByResourceName(application.getTeamId(), application.getJar());
ApiAlertException.throwIfNull(
resource, "pyflink file can't be null, start application failed.");
@@ -661,25 +672,28 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, Applicati
case CUSTOM_CODE:
if (application.isUploadJob()) {
- appConf = String.format(
- "json://{\"%s\":\"%s\"}",
- ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
+ appConf =
+ String.format(
+ "json://{\"%s\":\"%s\"}",
+ ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
} else {
switch (application.getApplicationType()) {
case STREAMPARK_FLINK:
ConfigFileTypeEnum fileType = ConfigFileTypeEnum.of(applicationConfig.getFormat());
if (fileType != null && ConfigFileTypeEnum.UNKNOWN != fileType) {
- appConf = String.format(
- "%s://%s", fileType.getTypeName(), applicationConfig.getContent());
+ appConf =
+ String.format(
+ "%s://%s", fileType.getTypeName(), applicationConfig.getContent());
} else {
throw new IllegalArgumentException(
"application' config type error,must be ( yaml| properties| hocon )");
}
break;
case APACHE_FLINK:
- appConf = String.format(
- "json://{\"%s\":\"%s\"}",
- ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
+ appConf =
+ String.format(
+ "json://{\"%s\":\"%s\"}",
+ ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
break;
default:
throw new IllegalArgumentException(
@@ -690,21 +704,22 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, Applicati
if (FlinkExecutionMode.YARN_APPLICATION == executionModeEnum) {
switch (application.getApplicationType()) {
case STREAMPARK_FLINK:
- flinkUserJar = String.format(
- "%s/%s",
- application.getAppLib(),
- application.getModule().concat(Constant.JAR_SUFFIX));
+ flinkUserJar =
+ String.format(
+ "%s/%s",
+ application.getAppLib(), application.getModule().concat(Constant.JAR_SUFFIX));
break;
case APACHE_FLINK:
flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar());
if (!FsOperator.hdfs().exists(flinkUserJar)) {
- resource = resourceService.findByResourceName(
- application.getTeamId(), application.getJar());
+ resource =
+ resourceService.findByResourceName(
+ application.getTeamId(), application.getJar());
if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) {
- flinkUserJar = String.format(
- "%s/%s",
- application.getAppHome(),
- new File(resource.getFilePath()).getName());
+ flinkUserJar =
+ String.format(
+ "%s/%s",
+ application.getAppHome(), new File(resource.getFilePath()).getName());
}
}
break;
@@ -742,8 +757,10 @@ private Map getProperties(Application application, String runtim
application.getFlinkClusterId()));
properties.put(ConfigKeys.KEY_YARN_APP_ID(), cluster.getClusterId());
} else {
- String yarnQueue = (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE());
- String yarnLabelExpr = (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL());
+ String yarnQueue =
+ (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE());
+ String yarnLabelExpr =
+ (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL());
Optional.ofNullable(yarnQueue)
.ifPresent(yq -> properties.put(ConfigKeys.KEY_YARN_APP_QUEUE(), yq));
Optional.ofNullable(yarnLabelExpr)
@@ -761,7 +778,8 @@ private Map getProperties(Application application, String runtim
properties.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true);
}
- Map dynamicProperties = PropertiesUtils.extractDynamicPropertiesAsJava(runtimeProperties);
+ Map dynamicProperties =
+ PropertiesUtils.extractDynamicPropertiesAsJava(runtimeProperties);
properties.putAll(dynamicProperties);
ResolveOrder resolveOrder = ResolveOrder.of(application.getResolveOrder());
if (resolveOrder != null) {
@@ -789,8 +807,8 @@ private void doAbort(Long id) {
// kill application
if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) {
try {
- List applications = applicationInfoService
- .getYarnAppReport(application.getJobName());
+ List applications =
+ applicationInfoService.getYarnAppReport(application.getJobName());
if (!applications.isEmpty()) {
YarnClient yarnClient = HadoopUtils.yarnClient();
yarnClient.killApplication(applications.get(0).getApplicationId());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java
index 199220fb06..115feed57d 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java
@@ -92,6 +92,16 @@ record -> {
return paramsPage;
}
+ @Override
+ public FlinkCatalog getCatalog(Long catalogId) {
+ return this.baseMapper.selectById(catalogId);
+ }
+
+ @Override
+ public FlinkCatalog getCatalog(String catalogName) {
+ return this.baseMapper.selectByCatalogName(catalogName);
+ }
+
@Override
public boolean update(FlinkCatalogParams catalogParam, long userId) {
AlertException.throwIfNull(
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DatabaseImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DatabaseImpl.java
new file mode 100644
index 0000000000..d2acb93bd4
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DatabaseImpl.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.console.base.exception.AlertException;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.bean.DatabaseParam;
+import org.apache.streampark.console.core.entity.Database;
+import org.apache.streampark.console.core.entity.FlinkCatalog;
+import org.apache.streampark.console.core.mapper.DatabaseMapper;
+import org.apache.streampark.console.core.service.CatalogService;
+import org.apache.streampark.console.core.service.DatabaseService;
+import org.apache.streampark.console.core.service.FlinkCatalogService;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@Service
+@Slf4j
+@Transactional(propagation = Propagation.SUPPORTS, rollbackFor = Exception.class)
+public class DatabaseImpl extends ServiceImpl implements DatabaseService {
+
+ @Autowired
+ private CatalogService catalogService;
+ @Autowired
+ private FlinkCatalogService flinkCatalogService;
+
+ @Override
+ public boolean databaseExists(DatabaseParam databaseParam) {
+ AlertException.throwIfNull(databaseParam.getName(), "Database name can not be null.");
+ FlinkCatalog flinkCatalog = catalogService.getCatalog(databaseParam.getCatalogId());
+ AlertException.throwIfNull(flinkCatalog, "Catalog is not exit in database.");
+ return flinkCatalogService.databaseExists(
+ flinkCatalog.getCatalogName(),
+ getOptions(flinkCatalog.getConfiguration()),
+ databaseParam.getName());
+ }
+
+ @Override
+ public boolean createDatabase(DatabaseParam databaseParam) {
+ AlertException.throwIfNull(databaseParam.getName(), "Database name can not be null.");
+ FlinkCatalog flinkCatalog = catalogService.getCatalog(databaseParam.getCatalogId());
+ AlertException.throwIfNull(flinkCatalog, "Catalog is not exit in database.");
+ return flinkCatalogService.createDatabase(
+ flinkCatalog.getCatalogName(),
+ getOptions(flinkCatalog.getConfiguration()),
+ databaseParam.getName(),
+ databaseParam.isIgnoreIfExits());
+ }
+
+ @Override
+ public List listDatabases(Long catalogId) {
+ FlinkCatalog flinkCatalog = catalogService.getCatalog(catalogId);
+ AlertException.throwIfNull(
+ flinkCatalog, "The catalog can't be null. get catalog from database failed.");
+ List databases =
+ flinkCatalogService.listDatabases(
+ flinkCatalog.getCatalogName(), getOptions(flinkCatalog.getConfiguration()));
+ if (databases == null || databases.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List databaseList = new ArrayList<>();
+ databases.forEach(
+ dbName -> {
+ DatabaseParam dbParam = new DatabaseParam();
+ dbParam.setCatalogId(catalogId);
+ dbParam.setCatalogName(flinkCatalog.getCatalogName());
+ dbParam.setName(dbName);
+ databaseList.add(dbParam);
+ });
+ return databaseList;
+ }
+
+ @Override
+ public boolean dropDatabase(DatabaseParam databaseParam) {
+ AlertException.throwIfNull(databaseParam.getName(), "Database name can not be null.");
+ FlinkCatalog flinkCatalog = catalogService.getCatalog(databaseParam.getCatalogId());
+ AlertException.throwIfNull(flinkCatalog, "Catalog is not exit in database.");
+ flinkCatalogService.dropDatabase(
+ flinkCatalog.getCatalogName(),
+ getOptions(flinkCatalog.getConfiguration()),
+ databaseParam.getName(),
+ databaseParam.isIgnoreIfExits());
+ return true;
+ }
+
+ private Map getOptions(String configuration) {
+ try {
+ return JacksonUtils.toMap(configuration);
+ } catch (JsonProcessingException e) {
+ log.error("Convert configuration {} to Map Failed", configuration, e);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/CatalogServiceUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/CatalogServiceUtils.java
new file mode 100644
index 0000000000..e81725ef92
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/CatalogServiceUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.util;
+
+import org.apache.streampark.console.base.util.WebUtils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+public class CatalogServiceUtils {
+
+ private static final Pattern PATTERN_FLINK_CONNECTOR_PLUGIN =
+ Pattern.compile(
+ "^streampark-flink-connector-plugin-(.*)-(.*).jar$",
+ Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
+
+ public static Catalog getCatalog(
+ String catalogName, Map options,
+ Map configurations) {
+ ClassLoader classLoader = getCatalogClassLoader(Thread.currentThread().getContextClassLoader());
+ return FactoryUtil.createCatalog(
+ catalogName, options, Configuration.fromMap(configurations), classLoader);
+ }
+
+ /** get catalog classloader, add streampark-flink-connector-plugins */
+ protected static ClassLoader getCatalogClassLoader(ClassLoader classLoader) {
+ File pluginDir = WebUtils.getPluginDir();
+ File[] pluginFiles =
+ pluginDir.listFiles(
+ pathname -> pathname.getName().matches(PATTERN_FLINK_CONNECTOR_PLUGIN.pattern()));
+ if (pluginFiles == null) {
+ return classLoader;
+ }
+ List pluginUrls = new ArrayList();
+ for (File file : pluginFiles) {
+ try {
+ URL pluginUrl = file.toURI().toURL();
+ pluginUrls.add(pluginUrl);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return new URLClassLoader(pluginUrls.toArray(new URL[0]), classLoader);
+ }
+}
diff --git a/streampark-console/streampark-console-service/src/main/resources/config.yaml b/streampark-console/streampark-console-service/src/main/resources/config.yaml
index 49a9ef0e9e..69451a8ea9 100644
--- a/streampark-console/streampark-console-service/src/main/resources/config.yaml
+++ b/streampark-console/streampark-console-service/src/main/resources/config.yaml
@@ -97,3 +97,22 @@ sso:
principalNameAttribute:
# Optional, change by authentication client
# Please replace and fill in your client config below when enabled SSO
+
+## flink catalog store config
+table:
+ catalog-store:
+ kind: jdbc
+ jdbc:
+ url: jdbc://mysql:127.0.0.1:3306/streampark
+ # The JDBC database url.
+ table-name: t_flink_catalog
+ ## catalog store table
+ driver: com.mysql.cj.jdbc.Driver
+ # The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL.
+ username: root
+ # The JDBC user name. 'username' and 'password' must both be specified if any of them is specified.
+ password: 12345678
+ # The JDBC password.
+ max-retry-timeout: 600
+
+
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml
index 9d1ffde45f..f1d7bc1939 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml
@@ -38,6 +38,14 @@
limit 1
+
+