From 2cfc96275921be6732189bf88bcc167c991c8bb9 Mon Sep 17 00:00:00 2001 From: ouyangwulink Date: Thu, 22 Aug 2024 19:19:27 +0800 Subject: [PATCH] squash all committer --- build.sh | 2 +- pom.xml | 7 +- .../streampark/common/enums/CatalogType.java | 3 + .../streampark-console-service/pom.xml | 48 +++ .../console/base/util/JacksonUtils.java | 5 + .../console/base/util/WebUtils.java | 3 + .../console/core/bean/DatabaseParam.java | 37 ++ .../console/core/bean/FlinkCatalogParams.java | 53 ++- .../console/core/bean/FlinkDataType.java | 39 +++ .../console/core/bean/TableColumn.java | 50 +++ .../console/core/bean/TableParams.java | 55 +++ .../core/controller/CatalogController.java | 13 +- .../core/controller/DatabaseController.java | 57 ++- .../core/controller/TableController.java | 136 ++++++++ .../console/core/entity/Database.java | 38 ++ .../console/core/entity/FlinkCatalog.java | 3 + .../console/core/mapper/CatalogMapper.java | 2 + .../console/core/mapper/DatabaseMapper.java | 28 ++ .../console/core/service/CatalogService.java | 5 +- .../console/core/service/DatabaseService.java | 60 ++++ .../core/service/FlinkCatalogService.java | 293 ++++++++++++++++ .../console/core/service/TableService.java | 122 +++++++ .../impl/ApplicationActionServiceImpl.java | 168 +++++---- .../core/service/impl/CatalogServiceImpl.java | 12 +- .../service/impl/DatabaseServiceImpl.java | 121 +++++++ .../core/service/impl/TableServiceImpl.java | 306 ++++++++++++++++ .../core/util/CatalogServiceUtils.java | 81 +++++ .../core/util/DataTypeConverterUtils.java | 196 +++++++++++ .../src/main/resources/config.yaml | 19 + .../resources/mapper/core/CatalogMapper.xml | 8 + .../console/base/util/JacksonUtilTest.java | 44 +++ .../core/service/DatabaseServiceTest.java | 151 ++++++++ .../core/service/TableServiceTest.java | 328 ++++++++++++++++++ .../service/container/MySqlContainer.java | 167 +++++++++ .../service/container/MysqlBaseITCASE.java | 62 ++++ .../application-integration-test.yml | 16 + .../src/test/resources/application-test.yml | 16 + .../src/test/resources/docker/server/my.cnf | 64 ++++ .../src/test/resources/docker/setup.sql | 41 +++ streampark-flink/pom.xml | 6 + .../streampark-flink-catalog-store/pom.xml | 34 +- .../streampark-flink-client-core/pom.xml | 4 +- .../streampark-flink-connector-plugin/pom.xml | 23 +- .../org/apache/streampark/package-info.java} | 11 +- .../org/apache/streampark/package-info.java} | 16 +- .../pom.xml | 4 +- .../pom.xml | 4 +- .../streampark-flink-connector-kafka/pom.xml | 4 +- .../streampark-flink-core/pom.xml | 6 +- .../streampark-flink-kubernetes/pom.xml | 4 +- .../resources/CompKubernetesDeployment.java | 2 +- .../CompatibleKubernetesWatcher.java | 2 +- .../kubernetes/KubernetesRetriever.scala | 2 +- .../helper/KubernetesDeploymentHelper.scala | 8 +- .../ingress/IngressController.scala | 4 +- .../kubernetes/ingress/IngressStrategy.scala | 4 +- .../ingress/IngressStrategyV1.scala | 4 +- .../ingress/IngressStrategyV1beta1.scala | 6 +- .../model/K8sDeploymentEventCV.scala | 4 +- .../watcher/FlinkK8sEventWatcher.scala | 4 +- .../streampark-flink-shims-base/pom.xml | 8 +- .../flink/core/FlinkClientTrait.scala | 7 +- .../core/FlinkKubernetesClientTrait.scala | 2 +- .../flink/core/FlinkSqlValidator.scala | 2 +- .../streampark-flink-shims-test/pom.xml | 13 +- .../streampark-flink-shims_flink-1.18/pom.xml | 6 + .../streampark-flink-shims_flink-1.19/pom.xml | 6 + .../streampark-flink-sqlclient/pom.xml | 7 - tools/dependencies/known-dependencies.txt | 46 ++- 69 files changed, 2921 insertions(+), 191 deletions(-) create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/DatabaseParam.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkDataType.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/TableColumn.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/TableParams.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/TableController.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Database.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DatabaseMapper.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DatabaseService.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/TableService.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DatabaseServiceImpl.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/TableServiceImpl.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/CatalogServiceUtils.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/DataTypeConverterUtils.java create mode 100644 streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/JacksonUtilTest.java create mode 100644 streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DatabaseServiceTest.java create mode 100644 streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/TableServiceTest.java create mode 100644 streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/container/MySqlContainer.java create mode 100644 streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/container/MysqlBaseITCASE.java create mode 100644 streampark-console/streampark-console-service/src/test/resources/docker/server/my.cnf create mode 100644 streampark-console/streampark-console-service/src/test/resources/docker/setup.sql rename streampark-flink/streampark-flink-connector-plugin/src/{test/java/org/apache/streampark/DummyTest.java => main/java/org/apache/streampark/package-info.java} (80%) rename streampark-flink/streampark-flink-connector-plugin/src/{main/java/org/apache/streampark/Dummy.java => test/java/org/apache/streampark/package-info.java} (74%) diff --git a/build.sh b/build.sh index 50eecf0482..551b5c3548 100755 --- a/build.sh +++ b/build.sh @@ -172,7 +172,7 @@ print_logo() { build() { if [[ -x "$PRG_DIR/mvnw" ]]; then echo_g "Apache StreamPark, building..." - "$PRG_DIR/mvnw" -Pshaded,webapp,dist -DskipTests clean install -Drat.skip=true + "$PRG_DIR/mvnw" -Pshaded,webapp,dist -DskipTests clean install if [[ $? -eq 0 ]]; then printf '\n' echo_g """StreamPark project build successful! diff --git a/pom.xml b/pom.xml index 331f7d2859..6aa731be8c 100644 --- a/pom.xml +++ b/pom.xml @@ -91,8 +91,9 @@ 1.3.0 3.2.9 _${scala.binary.version} - - 1.14.4 + 3.2.0-1.18 + 3.0.1-1.17 + 1.18.1 1.8.1 1.0.0 1.14 @@ -139,13 +140,13 @@ 10.0.2 3.3.0 org.apache.streampark.shaded - flink-table-uber_${scala.binary.version} 5.1 1.18.24 5.9.1 3.4.6 1.17.14 3.23.1 + 1.19.8 false false diff --git a/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java b/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java index fb2caab555..d3baa95d4f 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java @@ -19,6 +19,9 @@ /** catalog type */ public enum CatalogType { + MYSQL, + PGSQL, + ORACLE, JDBC, HIVE, PAIMON, diff --git a/streampark-console/streampark-console-service/pom.xml b/streampark-console/streampark-console-service/pom.xml index ec4d16044f..66a9c87681 100644 --- a/streampark-console/streampark-console-service/pom.xml +++ b/streampark-console/streampark-console-service/pom.xml @@ -209,6 +209,10 @@ org.pac4j pac4j-core + + org.apache.shiro + shiro-web + @@ -367,12 +371,32 @@ org.apache.streampark streampark-spark-client-api_${scala.binary.version} ${project.version} + + + org.xerial.snappy + snappy-java + + + com.github.luben + zstd-jni + + + com.google.protobuf + protobuf-java + + org.apache.streampark streampark-flink-kubernetes_${scala.binary.version} ${project.version} + + + org.xerial.snappy + snappy-java + + @@ -449,6 +473,30 @@ ${flink.version} + + org.apache.flink + flink-table-api-java + ${flink.version} + + + + org.testcontainers + mysql + ${testcontainers.version} + test + + + org.apache.streampark + streampark-flink-connector-plugin + 2.2.0-SNAPSHOT + test + + + org.xerial.snappy + snappy-java + + + diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java index ee136c1b19..9ce940ba27 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.text.SimpleDateFormat; +import java.util.Map; /** Serialization utils */ public final class JacksonUtils { @@ -67,4 +68,8 @@ public static boolean isValidJson(String jsonStr) { return false; } } + + public static Map toMap(String jsonStr) throws JsonProcessingException { + return (Map) MAPPER.readValue(jsonStr, Map.class); + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java index 9a2a4a8de7..57d8b53419 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java @@ -87,4 +87,7 @@ public static File getAppClientDir() { return getAppDir(CLIENT); } + public static File getPluginDir() { + return getAppDir(PLUGINS); + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/DatabaseParam.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/DatabaseParam.java new file mode 100644 index 0000000000..8b7e82dd25 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/DatabaseParam.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import lombok.Data; + +import javax.validation.constraints.NotBlank; + +@Data +public class DatabaseParam { + + @NotBlank(message = "invalid.databaseName") + private String name; + + @NotBlank(message = "invalid.catalogId") + private Long catalogId; + + private String catalogName; + private boolean ignoreIfExits; + private boolean cascade; + private String description; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java index 46a8210d80..50c8f9cb6e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java @@ -21,10 +21,10 @@ import org.apache.streampark.console.base.util.JacksonUtils; import org.apache.streampark.console.core.entity.FlinkCatalog; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.codehaus.jackson.annotate.JsonProperty; import org.springframework.beans.BeanUtils; import javax.validation.constraints.NotBlank; @@ -67,6 +67,9 @@ public static FlinkCatalogParams of(FlinkCatalog flinkCatalog) { BeanUtils.copyProperties(flinkCatalog, flinkCatalogParams, "configuration"); try { switch (flinkCatalog.getCatalogType()) { + case MYSQL: + case PGSQL: + case ORACLE: case JDBC: flinkCatalogParams.setFlinkJDBCCatalog( JacksonUtils.read(flinkCatalog.getConfiguration(), FlinkJDBCCatalog.class)); @@ -130,6 +133,54 @@ public static class FlinkHiveCatalog implements Serializable { @JsonProperty("hadoop-conf-dir") private String hadoopConfDir; + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getHiveConfDir() { + return hiveConfDir; + } + + public void setHiveConfDir(String hiveConfDir) { + this.hiveConfDir = hiveConfDir; + } + + public String getDefaultDatabase() { + return defaultDatabase; + } + + public void setDefaultDatabase(String defaultDatabase) { + this.defaultDatabase = defaultDatabase; + } + + public String getHiveVersion() { + return hiveVersion; + } + + public void setHiveVersion(String hiveVersion) { + this.hiveVersion = hiveVersion; + } + + public String getHadoopConfDir() { + return hadoopConfDir; + } + + public void setHadoopConfDir(String hadoopConfDir) { + this.hadoopConfDir = hadoopConfDir; + } } @Data diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkDataType.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkDataType.java new file mode 100644 index 0000000000..084fdfe905 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkDataType.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** flink table datatype */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class FlinkDataType { + + private String type; + + private boolean isNullable; + + private Integer precision; + + private Integer scale; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/TableColumn.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/TableColumn.java new file mode 100644 index 0000000000..87f9869df6 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/TableColumn.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.annotation.Nullable; + +/** TableColumn model. */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableColumn { + + private Integer id; + + private String field; + + private FlinkDataType dataType; + + @Nullable + private String comment; + + @Nullable + private boolean isPk; + + @Nullable + private String defaultValue; + + private Integer sort; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/TableParams.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/TableParams.java new file mode 100644 index 0000000000..dbc518f0a9 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/TableParams.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; + +import java.util.List; +import java.util.Map; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableParams { + + @NotNull(message = "invalid.catalogId") + private Long catalogId; + + private String catalogName; + + @NotBlank(message = "invalid.databaseName") + private String databaseName; + + @NotBlank(message = "invalid.tableName") + private String name; + + private String description; + + private List tableColumns; + + private List partitionKey; + + private Map tableOptions; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java index 78847211f6..212639ed62 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/CatalogController.java @@ -21,6 +21,7 @@ import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.core.annotation.Permission; import org.apache.streampark.console.core.bean.FlinkCatalogParams; +import org.apache.streampark.console.core.entity.FlinkCatalog; import org.apache.streampark.console.core.service.CatalogService; import org.apache.streampark.console.core.util.ServiceHelper; @@ -30,6 +31,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -62,6 +65,14 @@ public RestResponse list(FlinkCatalogParams catalog, RestRequest request) { return RestResponse.success(catalogList); } + @GetMapping("get/{catalogName}") + @Permission(team = "#teamId") + @RequiresPermissions("catalog:view") + public RestResponse get(@PathVariable String catalogName, Long teamId) { + FlinkCatalog catalog = catalogService.getCatalog(catalogName); + return RestResponse.success(FlinkCatalogParams.of(catalog)); + } + @PostMapping("delete") @Permission(team = "#app.teamId") @RequiresPermissions("catalog:delete") @@ -73,7 +84,7 @@ public RestResponse remove(FlinkCatalogParams catalog, RestRequest request) { @PostMapping("update") @Permission(team = "#app.teamId") @RequiresPermissions("catalog:update") - public RestResponse remove(FlinkCatalogParams catalog) { + public RestResponse update(FlinkCatalogParams catalog) { Long userId = ServiceHelper.getUserId(); boolean updated = catalogService.update(catalog, userId); return RestResponse.success(updated); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/DatabaseController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/DatabaseController.java index f87781acdd..c4c4e9cebf 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/DatabaseController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/DatabaseController.java @@ -1,12 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.streampark.console.core.controller; +import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.base.domain.RestResponse; +import org.apache.streampark.console.core.bean.DatabaseParam; +import org.apache.streampark.console.core.service.DatabaseService; + +import org.apache.shiro.authz.annotation.RequiresPermissions; + import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import java.io.IOException; +import java.util.List; + @Slf4j @Validated @RestController @RequestMapping("flink/database") -public class DatabaseController {} +public class DatabaseController { + + @Autowired + DatabaseService databaseService; + + @PostMapping("create") + @RequiresPermissions("database:create") + public RestResponse create(DatabaseParam databaseParam) throws IOException { + boolean saved = databaseService.createDatabase(databaseParam); + return RestResponse.success(saved); + } + + @PostMapping("list") + @RequiresPermissions("database:view") + public RestResponse list(DatabaseParam databaseParam, RestRequest request) { + List databaseParamList = + databaseService.listDatabases(databaseParam.getCatalogId()); + return RestResponse.success(databaseParamList); + } + + @PostMapping("delete") + @RequiresPermissions("database:delete") + public RestResponse remove(DatabaseParam databaseParam) { + boolean deleted = databaseService.dropDatabase(databaseParam); + return RestResponse.success(deleted); + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/TableController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/TableController.java new file mode 100644 index 0000000000..581cff82e1 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/TableController.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.controller; + +import org.apache.streampark.console.base.domain.RestResponse; +import org.apache.streampark.console.core.annotation.Permission; +import org.apache.streampark.console.core.bean.TableParams; +import org.apache.streampark.console.core.service.TableService; + +import org.apache.shiro.authz.annotation.RequiresPermissions; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.stream.Collectors; + +@Slf4j +@Validated +@RestController +@RequestMapping("flink/table") +public class TableController { + + @Autowired + private TableService tableService; + + @PostMapping("create") + @RequiresPermissions("table:create") + public RestResponse createTable(TableParams table) { + boolean saved = tableService.createTable(table); + return RestResponse.success(saved); + } + + @PostMapping("column/add") + @RequiresPermissions("table:column:add") + public RestResponse addColumn(TableParams table) { + boolean saved = tableService.addColumn(table); + return RestResponse.success(saved); + } + + @Permission(team = "#table.teamId") + @GetMapping("column/list") + @RequiresPermissions("table:column:list") + public RestResponse listColumns( + @RequestParam String catalogName, + @RequestParam String databaseName, + @RequestParam String tableName) { + TableParams tableParams = tableService.listColumns(catalogName, databaseName, tableName); + return RestResponse.success(tableParams); + } + + @DeleteMapping("column/drop/{catalogName}/{databaseName}/{tableName}/{columnName}") + @RequiresPermissions("table:column:drop") + public RestResponse dropColumns( + @PathVariable String catalogName, + @PathVariable String databaseName, + @PathVariable String tableName, + @PathVariable String columnName) { + boolean dropped = tableService.dropColumn(catalogName, databaseName, tableName, columnName); + return RestResponse.success(dropped); + } + + @PostMapping("option/add") + @RequiresPermissions("option:add") + public RestResponse addOption(TableParams table) { + boolean addedOption = tableService.addOption(table); + return RestResponse.success(addedOption); + } + + @PostMapping("option/remove") + @RequiresPermissions("option:remove") + public RestResponse removeOption( + @RequestParam String catalogName, + @RequestParam String databaseName, + @RequestParam String tableName, + @RequestParam String key) { + boolean removedOption = tableService.removeOption(catalogName, databaseName, tableName, key); + return RestResponse.success(removedOption); + } + + @PostMapping("rename") + @RequiresPermissions("table:update") + public RestResponse renameTable( + @RequestParam String catalogName, + @RequestParam String databaseName, + @RequestParam String fromTableName, + @RequestParam String toTableName) { + boolean renamedOption = + tableService.renameTable(catalogName, databaseName, fromTableName, toTableName); + return RestResponse.success(renamedOption); + } + + @PostMapping("list") + @RequiresPermissions("table:list") + public RestResponse listTable(TableParams table) { + List tableParamsList = tableService.listTables(table); + if (Objects.nonNull(table.getCatalogId()) && Objects.nonNull(table.getDatabaseName())) { + return RestResponse.success(tableParamsList); + } else { + TreeMap>> collect = + tableParamsList.stream() + .collect( + Collectors.groupingBy( + TableParams::getCatalogId, + TreeMap::new, + Collectors.groupingBy(TableParams::getDatabaseName))); + return RestResponse.success(collect); + } + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Database.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Database.java new file mode 100644 index 0000000000..fcd507c3dd --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Database.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.entity; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class Database { + + private String name; + + private Integer catalogId; + + private String catalogName; + + private String description; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java index b8de5a2e55..79025147ac 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java @@ -74,6 +74,9 @@ public static FlinkCatalog of(FlinkCatalogParams flinkCatalogParams) { try { switch (flinkCatalogParams.getCatalogType()) { + case MYSQL: + case PGSQL: + case ORACLE: case JDBC: flinkCatalog.setConfiguration( JacksonUtils.write(flinkCatalogParams.getFlinkJDBCCatalog())); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java index 7053bee5bc..2914fd5e88 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/CatalogMapper.java @@ -30,5 +30,7 @@ public interface CatalogMapper extends BaseMapper { boolean existsByCatalogName(@Param("catalogName") String catalogName); + FlinkCatalog selectByCatalogName(@Param("catalogName") String catalogName); + IPage selectPage(Page page, @Param("catalog") FlinkCatalog catalog); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DatabaseMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DatabaseMapper.java new file mode 100644 index 0000000000..d8d26eb061 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DatabaseMapper.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.mapper; + +import org.apache.streampark.console.core.entity.Database; + +import org.apache.ibatis.annotations.Mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +@Mapper +public interface DatabaseMapper extends BaseMapper { +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java index c19d467b41..fa825fa46f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/CatalogService.java @@ -49,10 +49,13 @@ public interface CatalogService extends IService { */ IPage page(FlinkCatalogParams catalog, RestRequest request); + FlinkCatalog getCatalog(Long catalogId); + + FlinkCatalog getCatalog(String catalogName); /** * update Catalog * * @param catalog The {@link FlinkCatalogParams} object containing the search criteria. */ - boolean update(FlinkCatalogParams catalog, long userId); + boolean update(FlinkCatalogParams catalog, Long userId); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DatabaseService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DatabaseService.java new file mode 100644 index 0000000000..0a8c749c86 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DatabaseService.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +import org.apache.streampark.console.core.bean.DatabaseParam; +import org.apache.streampark.console.core.entity.Database; + +import com.baomidou.mybatisplus.extension.service.IService; + +import java.util.List; + +public interface DatabaseService extends IService { + + /** + * Checks if the specified database exists. + * + * @param databaseParam The database to check + * @return true if the database exists, false otherwise + */ + boolean databaseExists(DatabaseParam databaseParam); + + /** + * Creates a new database given {@link Database}. + * + * @param databaseParam The {@link DatabaseParam} object that contains the detail of the created + * database + * @return true if the operation is successful, false otherwise + */ + boolean createDatabase(DatabaseParam databaseParam); + + /** + * Lists databases given catalog id. + * + * @return The list of databases of given catalog + */ + List listDatabases(Long catalogId); + + /** + * Drops database given database name. + * + * @param databaseParam The dropping database + * @return true if the operation is successful, false otherwise + */ + boolean dropDatabase(DatabaseParam databaseParam); +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java new file mode 100644 index 0000000000..5f75b94904 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +import org.apache.streampark.console.core.util.CatalogServiceUtils; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@Service +public class FlinkCatalogService { + + @Value("${table.catalog-store.kind:jdbc}") + private String storeKind; + + @Value("${table.catalog-store.jdbc.url:jdbc://mysql:127.0.0.1:3306/flink-test}") + private String jdbcUrl; + + @Value("${table.catalog-store.jdbc.driver:com.mysql.cj.jdbc.Driver}") + private String jdbcDriver; + + @Value("${table.catalog-store.jdbc.username:flinkuser}") + private String jdbcUserName; + + @Value("${table.catalog-store.jdbc.password:flinkpw}") + private String jdbcPassword; + + @Value("${table.catalog-store.jdbc.max-retry-timeout:600") + private String jdbcMaxRetryTimeout; + + private final Map catalogMap = new ConcurrentHashMap<>(0); + + public List listDatabases(String catalogName, Map options) { + Catalog catalog = getCatalog(catalogName, options); + return catalog.listDatabases(); + } + + public boolean databaseExists(String catalogName, Map options, String database) { + Catalog catalog = getCatalog(catalogName, options); + return catalog.databaseExists(database); + } + + public boolean createDatabase( + String catalogName, + Map options, + String databaseName, + CatalogDatabase catalogDatabase, + boolean ignoreIfExists) { + Catalog catalog = getCatalog(catalogName, options); + try { + catalog.createDatabase(databaseName, catalogDatabase, ignoreIfExists); + return true; + } catch (CatalogException | DatabaseAlreadyExistException e) { + log.error("create database {} failed.", databaseName, e); + throw new CatalogException( + String.format("The database '%s' already exists in the catalog.", databaseName)); + } + } + + public void dropDatabase( + String catalogName, + Map options, + String databaseName, + boolean cascade, + boolean ignoreIfExists) { + Catalog catalog = getCatalog(catalogName, options); + try { + catalog.dropDatabase(databaseName, ignoreIfExists, cascade); + } catch (CatalogException | DatabaseNotEmptyException | DatabaseNotExistException e) { + log.error("Drop database {} failed.", databaseName, e); + throw new CatalogException( + String.format("The database '%s' already exists in the catalog.", databaseName)); + } + } + + public boolean tableExists( + String catalogName, Map options, String databaseName, String tableName) { + Catalog catalog = getCatalog(catalogName, options); + try { + return catalog.tableExists(new ObjectPath(databaseName, tableName)); + } catch (CatalogException e) { + log.error("Table exists {}.{} failed.", databaseName, tableName, e); + throw new CatalogException( + String.format("The table '%s.%s' not exists in the catalog.", databaseName, tableName)); + } + } + + public boolean createTable( + String catalogName, + Map options, + String databaseName, + String tableName, + CatalogTable catalogTable, + boolean ignoreIfExists) { + Catalog catalog = getCatalog(catalogName, options); + try { + catalog.createTable(new ObjectPath(databaseName, tableName), catalogTable, ignoreIfExists); + return true; + } catch (CatalogException e) { + log.error("Table {}.{} create failed.", databaseName, tableName, e); + throw new CatalogException( + String.format("Table '%s.%s' create failed.", databaseName, tableName)); + } catch (TableAlreadyExistException | DatabaseNotExistException e) { + log.error( + "Table {}.{} create failed.because table is exits or database not exist", + databaseName, + tableName, + e); + throw new RuntimeException(e); + } + } + + public boolean alterTable( + String catalogName, + Map options, + String databaseName, + String tableName, + List changes, + boolean ignoreIfExists) { + Catalog catalog = getCatalog(catalogName, options); + try { + CatalogBaseTable originTable = getTable(catalogName, options, databaseName, tableName); + CatalogTable currentTable = (CatalogTable) originTable; + + Schema currentSchema = currentTable.getUnresolvedSchema(); + final Schema schema = applyChangesToSchema(currentSchema, changes); + Map newOptions = new ConcurrentHashMap<>(); + for (TableChange change : changes) { + if (change instanceof TableChange.SetOption) { + newOptions.put( + ((TableChange.SetOption) change).getKey(), + ((TableChange.SetOption) change).getValue()); + } + if (change instanceof TableChange.ResetOption) { + newOptions.remove(((TableChange.ResetOption) change).getKey()); + } + } + + final CatalogTable newTable = + CatalogTable.of( + schema, currentTable.getComment(), currentTable.getPartitionKeys(), newOptions); + catalog.alterTable(new ObjectPath(databaseName, tableName), newTable, ignoreIfExists); + return true; + } catch (TableNotExistException e) { + throw new RuntimeException(e); + } + } + + public boolean dropTable( + String catalogName, + Map options, + String databaseName, + String tableName, + boolean ignoreIfExists) { + Catalog catalog = getCatalog(catalogName, options); + try { + catalog.dropTable(new ObjectPath(databaseName, tableName), ignoreIfExists); + return true; + } catch (TableNotExistException e) { + throw new RuntimeException(e); + } + } + + public boolean renameTable( + String catalogName, + Map options, + String databaseName, + String fromTableName, + String toTableName) { + Catalog catalog = getCatalog(catalogName, options); + try { + catalog.renameTable(new ObjectPath(databaseName, fromTableName), toTableName, true); + return true; + } catch (TableNotExistException | TableAlreadyExistException e) { + throw new RuntimeException(e); + } + } + + public List listTable( + String catalogName, Map options, String databaseName) { + Catalog catalog = getCatalog(catalogName, options); + try { + return catalog.listTables(databaseName); + } catch (DatabaseNotExistException e) { + throw new RuntimeException(e); + } + } + + public CatalogBaseTable getTable( + String catalogName, Map options, String databaseName, + String tableName) { + Catalog catalog = getCatalog(catalogName, options); + try { + return catalog.getTable(new ObjectPath(databaseName, tableName)); + } catch (TableNotExistException e) { + throw new RuntimeException(e); + } + } + + private Schema applyChangesToSchema(Schema currentSchema, List changes) { + // Clone the current schema to avoid modifying the original + Schema.Builder schemaBuilder = Schema.newBuilder().fromSchema(currentSchema); + + // Iterate over each change and apply it to the schema builder + Set columnsToDrop = new HashSet<>(); + for (TableChange change : changes) { + if (change instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) change; + schemaBuilder.column(addColumn.getColumn().getName(), addColumn.getColumn().getDataType()); + } else if (change instanceof TableChange.ModifyColumn) { + TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) change; + schemaBuilder.column( + modifyColumn.getNewColumn().getName(), modifyColumn.getNewColumn().getDataType()); + } else if (change instanceof TableChange.DropColumn) { + TableChange.DropColumn removeColumn = (TableChange.DropColumn) change; + columnsToDrop.add(removeColumn.getColumnName()); + } else { + throw new UnsupportedOperationException( + "Unsupported table change type: " + change.getClass().getName()); + } + } + // drop columns + if (!columnsToDrop.isEmpty()) { + for (Schema.UnresolvedColumn column : currentSchema.getColumns()) { + if (column instanceof Schema.UnresolvedPhysicalColumn) { + Schema.UnresolvedPhysicalColumn physicalColumn = (Schema.UnresolvedPhysicalColumn) column; + if (!columnsToDrop.contains(physicalColumn.getName())) { + schemaBuilder.column(physicalColumn.getName(), physicalColumn.getDataType()); + } + } + } + } + // Build the updated schema + return schemaBuilder.build(); + } + + private Catalog getCatalog(String catalogName, Map options) { + if (catalogMap.containsKey(catalogName)) { + return catalogMap.get(catalogName); + } else { + Map configuration = new HashMap<>(); + configuration.put(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND.key(), storeKind); + configuration.put("table.catalog-store.jdbc.url", jdbcUrl); + configuration.put("table.catalog-store.jdbc.driver", jdbcDriver); + configuration.put("table.catalog-store.jdbc.username", jdbcUserName); + configuration.put("table.catalog-store.jdbc.password", jdbcPassword); + configuration.put("table.catalog-store.jdbc.max-retry-timeout", jdbcMaxRetryTimeout); + Catalog catalog = CatalogServiceUtils.getCatalog(catalogName, options, configuration); + catalog.open(); + catalogMap.put(catalogName, catalog); + return catalog; + } + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/TableService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/TableService.java new file mode 100644 index 0000000000..2dc9bbcf95 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/TableService.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +import org.apache.streampark.console.core.bean.TableParams; + +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +public interface TableService { + + /** + * Checks if the specified table exists. + * + * @param tableParams The TableParams object containing information about the table + * @return true if the table exists, false otherwise + */ + boolean tableExists(TableParams tableParams); + + /** + * Creates a table in the database given ${@link TableParams}. + * + * @param tableParams The TableParams object containing information about the table + * @return true if the operation is successful, false otherwise + */ + boolean createTable(TableParams tableParams); + + /** + * Adds a column to the table. + * + * @param tableParams The TableDTO object containing information about the table + * @return true if the operation is successful, false otherwise + */ + boolean addColumn(TableParams tableParams); + + /** + * Drops a column from a table. + * + * @param catalogName The name of the catalog + * @param databaseName The name of the database + * @param tableName The name of the table + * @param columnName The name of the column to be dropped + * @return true if the operation is successful, false otherwise + */ + boolean dropColumn(String catalogName, String databaseName, String tableName, String columnName); + + /** + * Adds options to a table. + * + * @param tableDTO The TableDTO object containing information about the table + * @return true if the operation is successful, false otherwise + */ + boolean addOption(TableParams tableDTO); + + /** + * Removes an option from a table. + * + * @param catalogName The name of the catalog + * @param databaseName The name of the database + * @param tableName The name of the table + * @param key The key of the option to be removed + * @return true if the operation is successful, false otherwise + */ + boolean removeOption(String catalogName, String databaseName, String tableName, String key); + + /** + * Drops a table from the specified database in the given catalog. + * + * @param catalogName The name of the catalog from which the table will be dropped + * @param databaseName The name of the database from which the table will be dropped + * @param tableName The name of the table to be dropped + * @return true if the operation is successful, false otherwise + */ + boolean dropTable(String catalogName, String databaseName, String tableName); + + /** + * Renames a table in the specified database of the given catalog. + * + * @param catalogName The name of the catalog where the table resides + * @param databaseName The name of the database where the table resides + * @param fromTableName The current name of the table to be renamed + * @param toTableName The new name for the table + * @return true if the operation is successful, false otherwise + */ + boolean renameTable( + String catalogName, String databaseName, String fromTableName, String toTableName); + + /** + * Lists tables given {@link TableParams} condition. + * + * @return Response object containing a list of {@link TableParams} representing the tables + */ + List listTables(TableParams tableDTO); + + /** + * Retrieves the column details of a specific table within the specified catalog and database. + * + * @param catalogName The name of the catalog where the table is located + * @param databaseName The name of the database where the table is located + * @param tableName The name of the table whose columns are to be retrieved + * @return A {@link TableParams} object containing the details of the columns of the specified + * table + */ + TableParams listColumns(String catalogName, String databaseName, String tableName); +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java index e19976dd4d..c9bfa53217 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java @@ -89,6 +89,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -98,7 +99,6 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.google.common.collect.Sets; -import io.fabric8.kubernetes.client.KubernetesClientException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -181,9 +181,11 @@ public class ApplicationActionServiceImpl extends ServiceImpl> startFutureMap = new ConcurrentHashMap<>(); + private final Map> startFutureMap = + new ConcurrentHashMap<>(); - private final Map> cancelFutureMap = new ConcurrentHashMap<>(); + private final Map> cancelFutureMap = + new ConcurrentHashMap<>(); @Override public void revoke(Long appId) throws ApplicationException { @@ -292,26 +294,28 @@ public void cancel(Application appParam) throws Exception { properties.put(RestOptions.PORT.key(), activeAddress.getPort()); } - Tuple3 clusterIdNamespace = getNamespaceClusterId(application); + Tuple3 clusterIdNamespace = + getNamespaceClusterId(application); String namespace = clusterIdNamespace.t1; String clusterId = clusterIdNamespace.t2; - CancelRequest cancelRequest = new CancelRequest( - application.getId(), - flinkEnv.getFlinkVersion(), - FlinkExecutionMode.of(application.getExecutionMode()), - properties, - clusterId, - application.getJobId(), - appParam.getRestoreOrTriggerSavepoint(), - appParam.getDrain(), - customSavepoint, - appParam.getNativeFormat(), - namespace); + CancelRequest cancelRequest = + new CancelRequest( + application.getId(), + flinkEnv.getFlinkVersion(), + FlinkExecutionMode.of(application.getExecutionMode()), + properties, + clusterId, + application.getJobId(), + appParam.getRestoreOrTriggerSavepoint(), + appParam.getDrain(), + customSavepoint, + appParam.getNativeFormat(), + namespace); final Date triggerTime = new Date(); - CompletableFuture cancelFuture = CompletableFuture - .supplyAsync(() -> FlinkClient.cancel(cancelRequest), executorService); + CompletableFuture cancelFuture = + CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest), executorService); cancelFutureMap.put(application.getId(), cancelFuture); @@ -431,41 +435,45 @@ public void start(Application appParam, boolean auto) throws Exception { } // Get the args after placeholder replacement - String args = StringUtils.isBlank(appParam.getArgs()) ? application.getArgs() : appParam.getArgs(); + String args = + StringUtils.isBlank(appParam.getArgs()) ? application.getArgs() : appParam.getArgs(); String applicationArgs = variableService.replaceVariable(application.getTeamId(), args); - Tuple3 clusterIdNamespace = getNamespaceClusterId(application); + Tuple3 clusterIdNamespace = + getNamespaceClusterId(application); String k8sNamespace = clusterIdNamespace.t1; String k8sClusterId = clusterIdNamespace.t2; FlinkK8sRestExposedType exposedType = clusterIdNamespace.t3; String dynamicProperties = - StringUtils.isBlank(appParam.getDynamicProperties()) ? application.getDynamicProperties() + StringUtils.isBlank(appParam.getDynamicProperties()) + ? application.getDynamicProperties() : appParam.getDynamicProperties(); - SubmitRequest submitRequest = new SubmitRequest( - flinkEnv.getFlinkVersion(), - FlinkExecutionMode.of(application.getExecutionMode()), - getProperties(application, dynamicProperties), - flinkEnv.getFlinkConf(), - FlinkDevelopmentMode.of(application.getJobType()), - application.getId(), - new JobID().toHexString(), - application.getJobName(), - appConf, - application.getApplicationType(), - getSavepointPath(appParam), - FlinkRestoreMode.of(appParam.getRestoreMode()), - applicationArgs, - k8sClusterId, - application.getHadoopUser(), - buildResult, - extraParameter, - k8sNamespace, - exposedType); - - CompletableFuture future = CompletableFuture - .supplyAsync(() -> FlinkClient.submit(submitRequest), executorService); + SubmitRequest submitRequest = + new SubmitRequest( + flinkEnv.getFlinkVersion(), + FlinkExecutionMode.of(application.getExecutionMode()), + getProperties(application, dynamicProperties), + flinkEnv.getFlinkConf(), + FlinkDevelopmentMode.of(application.getJobType()), + application.getId(), + new JobID().toHexString(), + application.getJobName(), + appConf, + application.getApplicationType(), + getSavepointPath(appParam), + FlinkRestoreMode.of(appParam.getRestoreMode()), + applicationArgs, + k8sClusterId, + application.getHadoopUser(), + buildResult, + extraParameter, + k8sNamespace, + exposedType); + + CompletableFuture future = + CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), executorService); startFutureMap.put(application.getId(), future); @@ -594,10 +602,11 @@ private void processForException( private boolean checkAppRepeatInYarn(String jobName) { try { YarnClient yarnClient = HadoopUtils.yarnClient(); - Set types = Sets.newHashSet( - ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName()); - EnumSet states = EnumSet.of(YarnApplicationState.RUNNING, - YarnApplicationState.ACCEPTED); + Set types = + Sets.newHashSet( + ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName()); + EnumSet states = + EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.ACCEPTED); List applications = yarnClient.getApplications(types, states); for (ApplicationReport report : applications) { if (report.getName().equals(jobName)) { @@ -633,9 +642,10 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, Applicati // 1) dist_userJar String sqlDistJar = ServiceHelper.getFlinkSqlClientJar(flinkEnv); // 2) appConfig - appConf = applicationConfig == null - ? null - : String.format("yaml://%s", applicationConfig.getContent()); + appConf = + applicationConfig == null + ? null + : String.format("yaml://%s", applicationConfig.getContent()); // 3) client if (FlinkExecutionMode.YARN_APPLICATION == executionModeEnum) { String clientPath = Workspace.remote().APP_CLIENT(); @@ -644,7 +654,8 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, Applicati break; case PYFLINK: - Resource resource = resourceService.findByResourceName(application.getTeamId(), application.getJar()); + Resource resource = + resourceService.findByResourceName(application.getTeamId(), application.getJar()); ApiAlertException.throwIfNull( resource, "pyflink file can't be null, start application failed."); @@ -661,25 +672,28 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, Applicati case CUSTOM_CODE: if (application.isUploadJob()) { - appConf = String.format( - "json://{\"%s\":\"%s\"}", - ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); + appConf = + String.format( + "json://{\"%s\":\"%s\"}", + ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); } else { switch (application.getApplicationType()) { case STREAMPARK_FLINK: ConfigFileTypeEnum fileType = ConfigFileTypeEnum.of(applicationConfig.getFormat()); if (fileType != null && ConfigFileTypeEnum.UNKNOWN != fileType) { - appConf = String.format( - "%s://%s", fileType.getTypeName(), applicationConfig.getContent()); + appConf = + String.format( + "%s://%s", fileType.getTypeName(), applicationConfig.getContent()); } else { throw new IllegalArgumentException( "application' config type error,must be ( yaml| properties| hocon )"); } break; case APACHE_FLINK: - appConf = String.format( - "json://{\"%s\":\"%s\"}", - ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); + appConf = + String.format( + "json://{\"%s\":\"%s\"}", + ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); break; default: throw new IllegalArgumentException( @@ -690,21 +704,22 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, Applicati if (FlinkExecutionMode.YARN_APPLICATION == executionModeEnum) { switch (application.getApplicationType()) { case STREAMPARK_FLINK: - flinkUserJar = String.format( - "%s/%s", - application.getAppLib(), - application.getModule().concat(Constant.JAR_SUFFIX)); + flinkUserJar = + String.format( + "%s/%s", + application.getAppLib(), application.getModule().concat(Constant.JAR_SUFFIX)); break; case APACHE_FLINK: flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar()); if (!FsOperator.hdfs().exists(flinkUserJar)) { - resource = resourceService.findByResourceName( - application.getTeamId(), application.getJar()); + resource = + resourceService.findByResourceName( + application.getTeamId(), application.getJar()); if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) { - flinkUserJar = String.format( - "%s/%s", - application.getAppHome(), - new File(resource.getFilePath()).getName()); + flinkUserJar = + String.format( + "%s/%s", + application.getAppHome(), new File(resource.getFilePath()).getName()); } } break; @@ -742,8 +757,10 @@ private Map getProperties(Application application, String runtim application.getFlinkClusterId())); properties.put(ConfigKeys.KEY_YARN_APP_ID(), cluster.getClusterId()); } else { - String yarnQueue = (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE()); - String yarnLabelExpr = (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL()); + String yarnQueue = + (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE()); + String yarnLabelExpr = + (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL()); Optional.ofNullable(yarnQueue) .ifPresent(yq -> properties.put(ConfigKeys.KEY_YARN_APP_QUEUE(), yq)); Optional.ofNullable(yarnLabelExpr) @@ -761,7 +778,8 @@ private Map getProperties(Application application, String runtim properties.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true); } - Map dynamicProperties = PropertiesUtils.extractDynamicPropertiesAsJava(runtimeProperties); + Map dynamicProperties = + PropertiesUtils.extractDynamicPropertiesAsJava(runtimeProperties); properties.putAll(dynamicProperties); ResolveOrder resolveOrder = ResolveOrder.of(application.getResolveOrder()); if (resolveOrder != null) { @@ -789,8 +807,8 @@ private void doAbort(Long id) { // kill application if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) { try { - List applications = applicationInfoService - .getYarnAppReport(application.getJobName()); + List applications = + applicationInfoService.getYarnAppReport(application.getJobName()); if (!applications.isEmpty()) { YarnClient yarnClient = HadoopUtils.yarnClient(); yarnClient.killApplication(applications.get(0).getApplicationId()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java index 199220fb06..b78c9f7dcf 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/CatalogServiceImpl.java @@ -93,7 +93,17 @@ record -> { } @Override - public boolean update(FlinkCatalogParams catalogParam, long userId) { + public FlinkCatalog getCatalog(Long catalogId) { + return this.baseMapper.selectById(catalogId); + } + + @Override + public FlinkCatalog getCatalog(String catalogName) { + return this.baseMapper.selectByCatalogName(catalogName); + } + + @Override + public boolean update(FlinkCatalogParams catalogParam, Long userId) { AlertException.throwIfNull( catalogParam.getTeamId(), "The teamId can't be null. List catalog failed."); FlinkCatalog catalog = getById(catalogParam.getId()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DatabaseServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DatabaseServiceImpl.java new file mode 100644 index 0000000000..ebd8c864e8 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DatabaseServiceImpl.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service.impl; + +import org.apache.streampark.console.base.exception.AlertException; +import org.apache.streampark.console.core.bean.DatabaseParam; +import org.apache.streampark.console.core.entity.Database; +import org.apache.streampark.console.core.entity.FlinkCatalog; +import org.apache.streampark.console.core.mapper.DatabaseMapper; +import org.apache.streampark.console.core.service.CatalogService; +import org.apache.streampark.console.core.service.DatabaseService; +import org.apache.streampark.console.core.service.FlinkCatalogService; + +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.streampark.console.core.util.CatalogServiceUtils.getOptions; + +@Service +@Slf4j +@Transactional(propagation = Propagation.SUPPORTS, rollbackFor = Exception.class) +public class DatabaseServiceImpl extends ServiceImpl implements DatabaseService { + + @Autowired + private CatalogService catalogService; + @Autowired + private FlinkCatalogService flinkCatalogService; + + @Override + public boolean databaseExists(DatabaseParam databaseParam) { + AlertException.throwIfNull(databaseParam.getName(), "Database name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(databaseParam.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit in database."); + return flinkCatalogService.databaseExists( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + databaseParam.getName()); + } + + @Override + public boolean createDatabase(DatabaseParam databaseParam) { + AlertException.throwIfNull(databaseParam.getName(), "Database name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(databaseParam.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit in database."); + Map dbMap = new ConcurrentHashMap<>(); + dbMap.put("cascade", String.valueOf(databaseParam.isCascade())); + CatalogDatabase catalogDatabase = + new CatalogDatabaseImpl(dbMap, databaseParam.getDescription()); + return flinkCatalogService.createDatabase( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + databaseParam.getName(), + catalogDatabase, + databaseParam.isIgnoreIfExits()); + } + + @Override + public List listDatabases(Long catalogId) { + FlinkCatalog flinkCatalog = catalogService.getCatalog(catalogId); + AlertException.throwIfNull( + flinkCatalog, "The catalog can't be null. get catalog from database failed."); + List databases = + flinkCatalogService.listDatabases( + flinkCatalog.getCatalogName(), getOptions(flinkCatalog.getConfiguration())); + if (databases == null || databases.isEmpty()) { + return Collections.emptyList(); + } + List databaseList = new ArrayList<>(); + databases.forEach( + dbName -> { + DatabaseParam dbParam = new DatabaseParam(); + dbParam.setCatalogId(catalogId); + dbParam.setCatalogName(flinkCatalog.getCatalogName()); + dbParam.setName(dbName); + databaseList.add(dbParam); + }); + return databaseList; + } + + @Override + public boolean dropDatabase(DatabaseParam databaseParam) { + AlertException.throwIfNull(databaseParam.getName(), "Database name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(databaseParam.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit in database."); + flinkCatalogService.dropDatabase( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + databaseParam.getName(), + databaseParam.isCascade(), + databaseParam.isIgnoreIfExits()); + return true; + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/TableServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/TableServiceImpl.java new file mode 100644 index 0000000000..6501bc4a30 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/TableServiceImpl.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service.impl; + +import org.apache.streampark.console.base.exception.AlertException; +import org.apache.streampark.console.core.bean.TableColumn; +import org.apache.streampark.console.core.bean.TableParams; +import org.apache.streampark.console.core.entity.FlinkCatalog; +import org.apache.streampark.console.core.service.CatalogService; +import org.apache.streampark.console.core.service.FlinkCatalogService; +import org.apache.streampark.console.core.service.TableService; +import org.apache.streampark.console.core.util.DataTypeConverterUtils; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.UniqueConstraint; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.streampark.console.core.util.CatalogServiceUtils.getOptions; + +@Service +@Slf4j +@Transactional(propagation = Propagation.SUPPORTS, rollbackFor = Exception.class) +public class TableServiceImpl implements TableService { + + @Autowired + private CatalogService catalogService; + @Autowired + private FlinkCatalogService flinkCatalogService; + + @Override + public boolean tableExists(TableParams tableParams) { + AlertException.throwIfNull(tableParams.getName(), "Table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(tableParams.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + return flinkCatalogService.tableExists( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + tableParams.getDatabaseName(), + tableParams.getName()); + } + + @Override + public boolean createTable(TableParams tableParams) { + AlertException.throwIfNull(tableParams.getName(), "Table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(tableParams.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + AlertException.throwIfNull(tableParams.getTableColumns(), "Table column can not be null."); + AlertException.throwIfNull(tableParams.getTableOptions(), "Table options can not be null."); + List columns = new ArrayList<>(); + List ukColumns = new ArrayList<>(); + AtomicReference ukName = new AtomicReference<>("uk"); + tableParams + .getTableColumns() + .forEach( + tc -> { + columns.add( + Column.physical( + tc.getField(), DataTypeConverterUtils.convertToDataType(tc.getDataType()))); + if (tc.isPk()) { + ukColumns.add(tc.getField()); + ukName.set(ukName + tc.getField()); + } + }); + final Schema schema = + Schema.newBuilder() + .fromResolvedSchema( + new ResolvedSchema( + columns, + Collections.emptyList(), + UniqueConstraint.primaryKey(ukName.get(), ukColumns))) + .build(); + final CatalogTable originTable = + CatalogTable.of( + schema, + tableParams.getDescription(), + tableParams.getPartitionKey(), + tableParams.getTableOptions()); + return flinkCatalogService.createTable( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + tableParams.getDatabaseName(), + tableParams.getName(), + originTable, + true); + } + + @Override + public boolean addColumn(TableParams tableParams) { + AlertException.throwIfNull(tableParams.getName(), "Table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(tableParams.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + AlertException.throwIfNull(tableParams.getTableColumns(), "Table column can not be null."); + + List tableChanges = new ArrayList<>(); + for (TableColumn tableColumn : tableParams.getTableColumns()) { + Column column = + Column.physical( + tableColumn.getField(), + DataTypeConverterUtils.convertToDataType(tableColumn.getDataType())) + .withComment(tableColumn.getComment()); + TableChange.AddColumn addColumn = TableChange.add(column); + tableChanges.add(addColumn); + } + + return flinkCatalogService.alterTable( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + tableParams.getDatabaseName(), + tableParams.getName(), + tableChanges, + true); + } + + @Override + public boolean dropColumn( + String catalogName, String databaseName, String tableName, String columnName) { + AlertException.throwIfNull(tableName, "Table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(catalogName); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + AlertException.throwIfNull(columnName, "Table column name can not be null."); + + List tableChanges = new ArrayList<>(); + TableChange.DropColumn dropColumn = TableChange.dropColumn(columnName); + tableChanges.add(dropColumn); + return flinkCatalogService.alterTable( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + databaseName, + tableName, + tableChanges, + true); + } + + @Override + public boolean addOption(TableParams tableParams) { + + AlertException.throwIfNull(tableParams.getName(), "Table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(tableParams.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + AlertException.throwIfNull(tableParams.getTableOptions(), "Table options can not be null."); + List tableChanges = new ArrayList<>(); + tableParams + .getTableOptions() + .forEach( + (key, value) -> { + tableChanges.add(TableChange.set(key, value)); + }); + + return flinkCatalogService.alterTable( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + tableParams.getDatabaseName(), + tableParams.getName(), + tableChanges, + true); + } + + @Override + public boolean removeOption( + String catalogName, String databaseName, String tableName, String key) { + AlertException.throwIfNull(tableName, "Table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(catalogName); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + AlertException.throwIfNull(key, "Table options key can not be null."); + List tableChanges = new ArrayList<>(); + + tableChanges.add(new TableChange.ResetOption(key)); + + return flinkCatalogService.alterTable( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + databaseName, + tableName, + tableChanges, + true); + } + + @Override + public boolean dropTable(String catalogName, String databaseName, String tableName) { + AlertException.throwIfNull(tableName, "Table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(catalogName); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + return flinkCatalogService.dropTable( + catalogName, getOptions(flinkCatalog.getConfiguration()), databaseName, tableName, true); + } + + @Override + public boolean renameTable( + String catalogName, String databaseName, String fromTableName, String toTableName) { + AlertException.throwIfNull(fromTableName, "From table name can not be null."); + AlertException.throwIfNull(toTableName, "To table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(catalogName); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + return flinkCatalogService.renameTable( + catalogName, + getOptions(flinkCatalog.getConfiguration()), + databaseName, + fromTableName, + toTableName); + } + + @Override + public List listTables(TableParams tableParams) { + AlertException.throwIfNull(tableParams.getDatabaseName(), "Database name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(tableParams.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + List tables = + flinkCatalogService.listTable( + tableParams.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + tableParams.getDatabaseName()); + + if (tables == null || tables.isEmpty()) { + return null; + } + List tableParamsList = new ArrayList<>(); + tables.forEach( + tableName -> { + tableParamsList.add( + covertToTableParams( + flinkCatalogService.getTable( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + tableParams.getDatabaseName(), + tableParams.getName()))); + }); + + return tableParamsList; + } + + @Override + public TableParams listColumns(String catalogName, String databaseName, String tableName) { + AlertException.throwIfNull(databaseName, "Database name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(catalogName); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + + CatalogBaseTable originTable = + flinkCatalogService.getTable( + catalogName, getOptions(flinkCatalog.getConfiguration()), databaseName, tableName); + TableParams tableParams = covertToTableParams(originTable); + tableParams.setName(tableName); + tableParams.setCatalogName(catalogName); + tableParams.setDatabaseName(catalogName); + tableParams.setCatalogId(flinkCatalog.getId()); + return tableParams; + } + + private TableParams covertToTableParams(CatalogBaseTable catalogBaseTable) { + List tableColumns = new ArrayList<>(); + catalogBaseTable + .getUnresolvedSchema() + .getColumns() + .forEach( + unresolvedColumn -> { + TableColumn tableColumn = new TableColumn(); + tableColumn.setField(unresolvedColumn.getName()); + unresolvedColumn.getComment().ifPresent(tableColumn::setComment); + tableColumns.add(tableColumn); + }); + catalogBaseTable + .getUnresolvedSchema() + .getPrimaryKey() + .ifPresent( + unresolvedPrimaryKey -> { + List primaryKeys = unresolvedPrimaryKey.getColumnNames(); + for (String primary : primaryKeys) { + for (TableColumn column : tableColumns) { + if (column.getField().equals(primary)) { + column.setPk(true); + } + } + } + }); + return TableParams.builder().tableColumns(tableColumns).build(); + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/CatalogServiceUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/CatalogServiceUtils.java new file mode 100644 index 0000000000..4313edd5f4 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/CatalogServiceUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.util; + +import org.apache.streampark.console.base.util.JacksonUtils; +import org.apache.streampark.console.base.util.WebUtils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.FactoryUtil; + +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +public class CatalogServiceUtils { + + private static final Pattern PATTERN_FLINK_CONNECTOR_PLUGIN = + Pattern.compile( + "^streampark-flink-connector-plugin-([^-.]+)-([^-.]+)\\.jar$\n", + Pattern.CASE_INSENSITIVE | Pattern.DOTALL); + + public static Catalog getCatalog( + String catalogName, Map options, + Map configurations) { + ClassLoader classLoader = getCatalogClassLoader(Thread.currentThread().getContextClassLoader()); + return FactoryUtil.createCatalog( + catalogName, options, Configuration.fromMap(configurations), classLoader); + } + + /** get catalog classloader, add streampark-flink-connector-plugins */ + protected static ClassLoader getCatalogClassLoader(ClassLoader classLoader) { + File pluginDir = WebUtils.getPluginDir(); + File[] pluginFiles = + pluginDir.listFiles( + pathname -> pathname.getName().matches(PATTERN_FLINK_CONNECTOR_PLUGIN.pattern())); + if (pluginFiles == null) { + return classLoader; + } + List pluginUrls = new ArrayList(); + for (File file : pluginFiles) { + try { + URL pluginUrl = file.toURI().toURL(); + pluginUrls.add(pluginUrl); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + return new URLClassLoader(pluginUrls.toArray(new URL[0]), classLoader); + } + + public static Map getOptions(String configuration) { + try { + return JacksonUtils.toMap(configuration); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/DataTypeConverterUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/DataTypeConverterUtils.java new file mode 100644 index 0000000000..b8b6b677bc --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/DataTypeConverterUtils.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.util; + +import org.apache.streampark.console.core.bean.FlinkDataType; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.VarCharType; + +public class DataTypeConverterUtils { + + public static DataType convertToDataType(FlinkDataType flinkDataType) { + DataType dataType; + + String type = flinkDataType.getType().toLowerCase(); + boolean isNullable = flinkDataType.isNullable(); + Integer precision = flinkDataType.getPrecision(); + Integer scale = flinkDataType.getScale(); + + switch (type) { + // Integer types + case "tinyint": + case "int1": + dataType = DataTypes.TINYINT(); + break; + case "smallint": + case "int2": + dataType = DataTypes.SMALLINT(); + break; + case "int": + case "integer": + case "int4": + dataType = DataTypes.INT(); + break; + case "bigint": + case "int8": + dataType = DataTypes.BIGINT(); + break; + + // Floating-point types + case "float": + case "real": + dataType = DataTypes.FLOAT(); + break; + case "double": + case "float8": + dataType = DataTypes.DOUBLE(); + break; + + // Decimal and Numeric types + case "decimal": + case "numeric": + if (precision != null && scale != null) { + dataType = DataTypes.DECIMAL(precision, scale); + } else { + dataType = DataTypes.DECIMAL(38, 18); // Default precision and scale + } + break; + + // Character types + case "char": + if (precision != null) { + dataType = DataTypes.CHAR(precision); + } else { + dataType = DataTypes.CHAR(1); // Default size + } + break; + case "varchar": + case "string": + case "text": + dataType = DataTypes.STRING(); + break; + + // Binary data types + case "binary": + case "varbinary": + case "blob": + dataType = DataTypes.BYTES(); + break; + + // Date and time types + case "date": + dataType = DataTypes.DATE(); + break; + case "timestamp": + if (precision != null) { + dataType = DataTypes.TIMESTAMP(precision); + } else { + dataType = DataTypes.TIMESTAMP(3); // Default precision + } + break; + case "time": + dataType = DataTypes.TIME(); + break; + + // Boolean type + case "boolean": + case "bool": + dataType = DataTypes.BOOLEAN(); + break; + + // JSON and other types + case "json": + dataType = DataTypes.STRING(); // JSON as STRING in Flink + break; + case "uuid": + dataType = DataTypes.STRING(); // UUID as STRING + break; + + // Default case for unsupported types + default: + throw new IllegalArgumentException("Unsupported type: " + type); + } + + // Apply nullability + return isNullable ? dataType.nullable() : dataType.notNull(); + } + + public static FlinkDataType convertorToFlinkDataType(DataType dataType) { + LogicalType logicalType = dataType.getLogicalType(); + boolean isNullable = dataType.getLogicalType().isNullable(); + String typeName = logicalType.getTypeRoot().name().toLowerCase(); + Integer precision = null; + Integer scale = null; + + switch (logicalType.getTypeRoot()) { + case CHAR: + case VARCHAR: + if (logicalType instanceof CharType) { + precision = ((CharType) logicalType).getLength(); + } else if (logicalType instanceof VarCharType) { + precision = ((VarCharType) logicalType).getLength(); + } + break; + case DECIMAL: + if (logicalType instanceof DecimalType) { + precision = ((DecimalType) logicalType).getPrecision(); + scale = ((DecimalType) logicalType).getScale(); + } + break; + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT: + case DOUBLE: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case BOOLEAN: + case BINARY: + case VARBINARY: + case ARRAY: + case MULTISET: + case MAP: + case ROW: + case RAW: + case NULL: + case SYMBOL: + case UNRESOLVED: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + case TIMESTAMP_WITH_TIME_ZONE: + case DISTINCT_TYPE: + case STRUCTURED_TYPE: + // case JSON: + // case UUID: + // These types do not have precision or scale + break; + default: + throw new IllegalArgumentException("Unsupported type: " + logicalType); + } + + return new FlinkDataType(typeName, isNullable, precision, scale); + } +} diff --git a/streampark-console/streampark-console-service/src/main/resources/config.yaml b/streampark-console/streampark-console-service/src/main/resources/config.yaml index 49a9ef0e9e..12f48ea9ab 100644 --- a/streampark-console/streampark-console-service/src/main/resources/config.yaml +++ b/streampark-console/streampark-console-service/src/main/resources/config.yaml @@ -97,3 +97,22 @@ sso: principalNameAttribute: # Optional, change by authentication client # Please replace and fill in your client config below when enabled SSO + +## flink catalog store config +table: + catalog-store: + kind: jdbc + jdbc: + url: jdbc://mysql:127.0.0.1:3306/flink-test + # The JDBC database url. + table-name: t_flink_catalog + ## catalog store table + driver: com.mysql.cj.jdbc.Driver + # The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL. + username: flinkuser + # The JDBC user name. 'username' and 'password' must both be specified if any of them is specified. + password: flinkpw + # The JDBC password. + max-retry-timeout: 600 + + diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml index 9d1ffde45f..f1d7bc1939 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/CatalogMapper.xml @@ -38,6 +38,14 @@ limit 1 + +