diff --git a/streampark-common/pom.xml b/streampark-common/pom.xml index 64694d9424..32babf65ce 100644 --- a/streampark-common/pom.xml +++ b/streampark-common/pom.xml @@ -115,6 +115,7 @@ true + org.apache.streampark @@ -128,6 +129,7 @@ ${streampark.shaded.version} + dev.zio diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala index 9f39a914c9..75c0b4e9c5 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala @@ -80,18 +80,16 @@ object ConfigConst { val KEY_SPARK_BATCH_DURATION = "spark.batch.duration" // flink - def KEY_APP_CONF(prefix: String = null): String = if (prefix == null) "conf" else s"${prefix}conf" + def KEY_APP_CONF(prefix: String = null): String = s"${Option(prefix).getOrElse("")}conf" - def KEY_FLINK_CONF(prefix: String = null): String = - if (prefix == null) "flink.conf" else s"${prefix}flink.conf" + def KEY_FLINK_CONF(prefix: String = null): String = s"${Option(prefix).getOrElse("")}flink.conf" - def KEY_APP_NAME(prefix: String = null): String = - if (prefix == null) "app.name" else s"${prefix}app.name" + def KEY_APP_NAME(prefix: String = null): String = s"${Option(prefix).getOrElse("")}app.name" - def KEY_FLINK_SQL(prefix: String = null): String = if (prefix == null) "sql" else s"${prefix}sql" + def KEY_FLINK_SQL(prefix: String = null): String = s"${Option(prefix).getOrElse("")}sql" def KEY_FLINK_PARALLELISM(prefix: String = null): String = - if (prefix == null) "parallelism.default" else s"${prefix}parallelism.default" + s"${Option(prefix).getOrElse("")}parallelism.default" val KEY_FLINK_OPTION_PREFIX = "flink.option." diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala index 620f680e7f..eb3f3dadcf 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala @@ -16,9 +16,11 @@ */ package org.apache.streampark.common.util -import java.io.File +import java.io.{File, IOException} import java.net.{URL, URLClassLoader} -import java.util.function.Supplier +import java.util.function.{Consumer, Supplier} + +import scala.collection.mutable.ArrayBuffer object ClassLoaderUtils extends Logger { @@ -61,6 +63,15 @@ object ClassLoaderUtils extends Logger { Thread.currentThread.setContextClassLoader(originalClassLoader) } } + @throws[IOException] + def cloneClassLoader(): ClassLoader = { + val urls = originalClassLoader.getResources(".") + val buffer = ArrayBuffer[URL]() + while (urls.hasMoreElements) { + buffer += urls.nextElement() + } + new URLClassLoader(buffer.toArray[URL], originalClassLoader) + } def loadJar(jarFilePath: String): Unit = { val jarFile = new File(jarFilePath) diff --git a/streampark-console/streampark-console-service/pom.xml b/streampark-console/streampark-console-service/pom.xml index f88f57aa31..4202d7c968 100644 --- a/streampark-console/streampark-console-service/pom.xml +++ b/streampark-console/streampark-console-service/pom.xml @@ -380,6 +380,7 @@ ${project.version} + com.fasterxml.jackson.module jackson-module-scala_${scala.binary.version} @@ -436,11 +437,17 @@ provided + + org.apache.flink + flink-table-common + ${flink.version} + + dev.zio zio-logging_${scala.binary.version} - + dev.zio zio-streams_${scala.binary.version} diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql index 7783e6c66f..52b885c7d9 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql @@ -531,4 +531,27 @@ create table `t_yarn_queue` ( unique key `unq_team_id_queue_label` (`team_id`, `queue_label`) using btree ) engine = innodb default charset = utf8mb4 collate = utf8mb4_general_ci; + + +-- ---------------------------- +-- Table of t_resource +-- ---------------------------- +drop table if exists `t_resource`; +create table if not exists `t_resource` ( +`id` bigint not null auto_increment primary key, +`resource_name` varchar(128) not null comment 'The name of the resource', +`resource_type` int not null comment '0:app 1:common 2:connector 3:format 4:udf', +`resource_path` varchar(255) default null, +`resource` text, +`engine_type` int not null comment 'compute engine type, 0:apache flink 1:apache spark', +`main_class` varchar(255) default null, +`description` text default null comment 'More detailed description of resource', +`creator_id` bigint not null comment 'user id of creator', +`connector_required_options` text default null, +`connector_optional_options` text default null, +`team_id` bigint not null comment 'team id', +`create_time` datetime not null default current_timestamp comment 'create time', +`modify_time` datetime not null default current_timestamp on update current_timestamp comment 'modify time' +); + set foreign_key_checks = 1; diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql index 757ac6cd7d..f428f92163 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql @@ -76,14 +76,14 @@ alter table `t_user` modify column `login_type` tinyint default 0 comment 'login -- ---------------------------- drop table if exists `t_flink_gateway`; create table `t_flink_gateway` ( - `id` bigint not null auto_increment, - `gateway_name` varchar(128) collate utf8mb4_general_ci not null comment 'The name of the gateway', - `description` text collate utf8mb4_general_ci default null comment 'More detailed description of resource', - `gateway_type` int not null comment 'The type of the gateway', - `address` varchar(150) default null comment 'url address of gateway endpoint', - `create_time` datetime not null default current_timestamp comment 'create time', - `modify_time` datetime not null default current_timestamp on update current_timestamp comment 'modify time', - primary key (`id`) using btree +`id` bigint not null auto_increment, +`gateway_name` varchar(128) collate utf8mb4_general_ci not null comment 'The name of the gateway', +`description` text collate utf8mb4_general_ci default null comment 'More detailed description of resource', +`gateway_type` int not null comment 'The type of the gateway', +`address` varchar(150) default null comment 'url address of gateway endpoint', +`create_time` datetime not null default current_timestamp comment 'create time', +`modify_time` datetime not null default current_timestamp on update current_timestamp comment 'modify time', +primary key (`id`) using btree ) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci; -- menu level 2 diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestRequest.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestRequest.java index 42fe5084af..7534584040 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestRequest.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestRequest.java @@ -27,7 +27,7 @@ @Data public class RestRequest implements Serializable { - private static final long serialVersionUID = -4869594085374385813L; + private static final long serialVersionUID = 1L; @Schema(example = "10", required = true) private int pageSize = 10; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java index f15fdc4102..e5a2610e3e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/RestResponse.java @@ -24,7 +24,7 @@ public class RestResponse extends HashMap { public static final String STATUS_SUCCESS = "success"; public static final String STATUS_FAIL = "error"; - private static final long serialVersionUID = -8713837118340960775L; + private static final long serialVersionUID = 1L; public static RestResponse success(Object data) { RestResponse resp = new RestResponse(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/RouterMeta.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/RouterMeta.java index 5753b6f640..1939e99db1 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/RouterMeta.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/RouterMeta.java @@ -29,7 +29,7 @@ @JsonInclude(JsonInclude.Include.NON_NULL) public class RouterMeta implements Serializable { - private static final long serialVersionUID = 5499925008927195914L; + private static final long serialVersionUID = 1L; private Boolean closeable; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/VueRouter.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/VueRouter.java index 45688413c4..da3dffd041 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/VueRouter.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/domain/router/VueRouter.java @@ -30,7 +30,7 @@ @JsonInclude(JsonInclude.Include.NON_NULL) public class VueRouter implements Serializable { - private static final long serialVersionUID = -3327478146308500708L; + private static final long serialVersionUID = 1L; @JsonIgnore private String id; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java index 5b80ba623e..04b90c6219 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/CommonUtils.java @@ -47,7 +47,7 @@ public final class CommonUtils implements Serializable { private CommonUtils() {} - private static final long serialVersionUID = 6458428317155311192L; + private static final long serialVersionUID = 1L; private static final String OS = System.getProperty("os.name").toLowerCase(); @@ -199,7 +199,7 @@ public static boolean contains(Iterator iterator, Object element) { if (iterator != null) { while (iterator.hasNext()) { Object candidate = iterator.next(); - if (ObjectUtils.safeEquals(candidate, element)) { + if (ObjectUtils.equals(candidate, element)) { return true; } } @@ -218,7 +218,7 @@ public static boolean contains(Enumeration enumeration, Object element) { if (enumeration != null) { while (enumeration.hasMoreElements()) { Object candidate = enumeration.nextElement(); - if (ObjectUtils.safeEquals(candidate, element)) { + if (ObjectUtils.equals(candidate, element)) { return true; } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java index 2d99c7bbf5..8fe562759c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java @@ -19,7 +19,6 @@ import org.apache.streampark.common.util.DateUtils; -import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -41,7 +40,6 @@ private JacksonUtils() {} MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); - MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL); MAPPER.setDateFormat(new SimpleDateFormat(DateUtils.fullFormat())); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java index f375dad52e..c16d2a42f1 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/ObjectUtils.java @@ -104,7 +104,7 @@ public static boolean containsElement(Object[] array, Object element) { return false; } for (Object arrayEle : array) { - if (safeEquals(arrayEle, element)) { + if (equals(arrayEle, element)) { return true; } } @@ -238,7 +238,7 @@ public static Object[] toObjectArray(Object source) { * @return whether the given objects are equal * @see Arrays#equals */ - public static boolean safeEquals(Object o1, Object o2) { + public static boolean equals(Object o1, Object o2) { if (o1 == null || o2 == null) { return false; } @@ -282,8 +282,8 @@ public static boolean safeEquals(Object o1, Object o2) { return false; } - public static boolean safeTrimEquals(Object o1, Object o2) { - boolean equals = safeEquals(o1, o2); + public static boolean trimEquals(Object o1, Object o2) { + boolean equals = equals(o1, o2); if (!equals) { if (o1 != null && o2 != null) { if (o1 instanceof String && o2 instanceof String) { @@ -294,6 +294,10 @@ public static boolean safeTrimEquals(Object o1, Object o2) { return equals; } + public static boolean trimNoEquals(Object o1, Object o2) { + return !trimEquals(o1, o2); + } + /** * Return as hash code for the given object; typically the value of * {@link Object#hashCode()}. If the object is an array, this method will delegate to any diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java index 6e15e62f58..413a2299ab 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java @@ -52,7 +52,7 @@ public boolean isEmpty() { return pom.isEmpty() && jar.isEmpty(); } - public boolean eq(Dependency other) { + public boolean equals(Dependency other) { if (other == null) { return false; } @@ -76,20 +76,20 @@ public boolean eq(Dependency other) { } public DependencyInfo toJarPackDeps() { - List mvnArts = - this.pom.stream() - .map( - pom -> - new Artifact( - pom.getGroupId(), - pom.getArtifactId(), - pom.getVersion(), - pom.getClassifier())) - .collect(Collectors.toList()); + List mvnArts = toArtifact(); List extJars = this.jar.stream() .map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar) .collect(Collectors.toList()); return new DependencyInfo(mvnArts, extJars); } + + public List toArtifact() { + return this.pom.stream() + .map( + pom -> + new Artifact( + pom.getGroupId(), pom.getArtifactId(), pom.getVersion(), pom.getClassifier())) + .collect(Collectors.toList()); + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkConnectorResource.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkConnectorResource.java new file mode 100644 index 0000000000..b6dc9db684 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkConnectorResource.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import lombok.Data; + +import java.util.Map; + +@Data +public class FlinkConnectorResource { + private String className; + private String factoryIdentifier; + Map requiredOptions; + Map optionalOptions; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Pom.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Pom.java index da10a529db..479195915c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Pom.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Pom.java @@ -48,10 +48,11 @@ public int hashCode() { @Override public String toString() { - return groupId + ":" + artifactId + ":" + version + getClassifier(":"); - } - - private String getClassifier(String joiner) { - return StringUtils.isEmpty(classifier) ? "" : joiner + classifier; + return String.format( + "%s:%s:%s%s", + groupId, + artifactId, + version, + StringUtils.isEmpty(classifier) ? "" : ":".concat(classifier)); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java index 12486ef8a0..0f9825f3fb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java @@ -36,6 +36,7 @@ import org.apache.streampark.console.core.service.ApplicationBackUpService; import org.apache.streampark.console.core.service.ApplicationLogService; import org.apache.streampark.console.core.service.ApplicationService; +import org.apache.streampark.console.core.service.ResourceService; import org.apache.streampark.flink.packer.pipeline.PipelineStatus; import org.apache.shiro.authz.annotation.RequiresPermissions; @@ -80,6 +81,8 @@ public class ApplicationController { @Autowired private AppBuildPipeService appBuildPipeService; + @Autowired private ResourceService resourceService; + @Operation(summary = "Get application") @ApiAccess @PostMapping("get") @@ -397,7 +400,7 @@ public RestResponse checkjar(String jar) { @PostMapping("upload") @RequiresPermissions("app:create") public RestResponse upload(MultipartFile file) throws Exception { - String uploadPath = applicationService.upload(file); + String uploadPath = resourceService.upload(file); return RestResponse.success(uploadPath); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java index 27c95b5550..a4608d134a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ResourceController.java @@ -36,6 +36,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.multipart.MultipartFile; import javax.validation.Valid; @@ -53,11 +54,17 @@ public class ResourceController { @Operation(summary = "add resource") @PostMapping("add") @RequiresPermissions("resource:add") - public RestResponse addResource(@Valid Resource resource) { + public RestResponse addResource(@Valid Resource resource) throws Exception { this.resourceService.addResource(resource); return RestResponse.success(); } + @Operation(summary = "check resource") + @PostMapping("check") + public RestResponse checkResource(@Valid Resource resource) throws Exception { + return this.resourceService.checkResource(resource); + } + @Operation(summary = "List resources") @PostMapping("page") public RestResponse page(RestRequest restRequest, Resource resource) { @@ -87,4 +94,12 @@ public RestResponse listResource(@RequestParam Long teamId) { List resourceList = resourceService.findByTeamId(teamId); return RestResponse.success(resourceList); } + + @Operation(summary = "Upload the resource jar") + @PostMapping("upload") + @RequiresPermissions("resource:add") + public RestResponse upload(MultipartFile file) throws Exception { + String uploadPath = resourceService.upload(file); + return RestResponse.success(uploadPath); + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index 4fc0cf1475..7bbeffccbf 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -27,7 +27,6 @@ import org.apache.streampark.common.enums.StorageType; import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.console.base.util.JacksonUtils; -import org.apache.streampark.console.base.util.ObjectUtils; import org.apache.streampark.console.core.bean.AppControl; import org.apache.streampark.console.core.bean.Dependency; import org.apache.streampark.console.core.enums.FlinkAppState; @@ -252,14 +251,6 @@ public String getIngressTemplate() { return ingressTemplate; } - public void setIngressTemplate(String ingressTemplate) { - this.ingressTemplate = ingressTemplate; - } - - public String getDefaultModeIngress() { - return defaultModeIngress; - } - public void setDefaultModeIngress(String defaultModeIngress) { this.defaultModeIngress = defaultModeIngress; } @@ -357,11 +348,6 @@ public DevelopmentMode getDevelopmentMode() { return DevelopmentMode.of(jobType); } - @JsonIgnore - public void setDevelopmentMode(DevelopmentMode mode) { - this.jobType = mode.getValue(); - } - @JsonIgnore public FlinkAppState getFlinkAppStateEnum() { return FlinkAppState.of(state); @@ -386,7 +372,7 @@ public boolean cpFailedTrigger() { public boolean eqFlinkJob(Application other) { if (this.isFlinkSqlJob() && other.isFlinkSqlJob()) { if (this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) { - return this.getDependencyObject().eq(other.getDependencyObject()); + return this.getDependencyObject().equals(other.getDependencyObject()); } } return false; @@ -509,75 +495,6 @@ public boolean isNeedRestartOnFailed() { return false; } - /** - * Parameter comparison, mainly to compare whether the parameters related to Flink runtime have - * changed - */ - public boolean eqJobParam(Application other) { - // 1) Resolve Order has it changed - // 2) flink Version has it changed - // 3) Execution Mode has it changed - // 4) Parallelism has it changed - // 5) Task Slots has it changed - // 6) Options has it changed - // 7) properties has it changed - // 8) Program Args has it changed - // 9) Flink Version has it changed - - if (!ObjectUtils.safeEquals(this.getVersionId(), other.getVersionId())) { - return false; - } - - if (!ObjectUtils.safeEquals(this.getResolveOrder(), other.getResolveOrder()) - || !ObjectUtils.safeEquals(this.getExecutionMode(), other.getExecutionMode()) - || !ObjectUtils.safeEquals(this.getK8sRestExposedType(), other.getK8sRestExposedType())) { - return false; - } - - if (this.getOptions() != null) { - if (other.getOptions() != null) { - if (!this.getOptions().trim().equals(other.getOptions().trim())) { - Map optMap = this.getOptionMap(); - Map otherMap = other.getOptionMap(); - if (optMap.size() != otherMap.size()) { - return false; - } - for (Map.Entry entry : optMap.entrySet()) { - if (!entry.getValue().equals(otherMap.get(entry.getKey()))) { - return false; - } - } - } - } else { - return false; - } - } else if (other.getOptions() != null) { - return false; - } - - if (this.getDynamicProperties() != null) { - if (other.getDynamicProperties() != null) { - if (!this.getDynamicProperties().trim().equals(other.getDynamicProperties().trim())) { - return false; - } - } else { - return false; - } - } else if (other.getDynamicProperties() != null) { - return false; - } - - if (this.getArgs() != null) { - if (other.getArgs() != null) { - return this.getArgs().trim().equals(other.getArgs().trim()); - } else { - return false; - } - } else { - return other.getArgs() == null; - } - } - @JsonIgnore public StorageType getStorageType() { return getStorageType(getExecutionMode()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java index cb3fc4f97b..60fc07dbdf 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java @@ -18,7 +18,6 @@ package org.apache.streampark.console.core.entity; import org.apache.streampark.common.util.DeflaterUtils; -import org.apache.streampark.console.base.util.ObjectUtils; import org.apache.streampark.console.core.bean.Dependency; import org.apache.streampark.console.core.enums.ChangedType; @@ -30,6 +29,7 @@ import java.util.Base64; import java.util.Date; +import java.util.Objects; @Data @TableName("t_flink_sql") @@ -89,10 +89,10 @@ public ChangedType checkChange(FlinkSql target) { // 2) determine if dependency has changed Dependency thisDependency = Dependency.toDependency(this.getDependency()); Dependency targetDependency = Dependency.toDependency(target.getDependency()); - boolean depDifference = !thisDependency.eq(targetDependency); + boolean depDifference = !thisDependency.equals(targetDependency); // 3) determine if team resource has changed - boolean teamResDifference = - !ObjectUtils.safeEquals(this.teamResource, target.getTeamResource()); + boolean teamResDifference = !Objects.equals(this.teamResource, target.getTeamResource()); + if (sqlDifference && depDifference && teamResDifference) { return ChangedType.ALL; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java index ac5eea6a5d..22a1fb3e09 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Resource.java @@ -35,13 +35,17 @@ @TableName("t_resource") public class Resource implements Serializable { - private static final long serialVersionUID = -7720746591258904369L; + private static final long serialVersionUID = 1L; @TableId(type = IdType.AUTO) private Long id; + // resourceName unique private String resourceName; + // resource path + private String resourcePath; + private String resource; @Size(max = 100, message = "{noMoreThan}") @@ -54,8 +58,15 @@ public class Resource implements Serializable { private EngineType engineType; + // for flink app private String mainClass; + // for flink connector + private String connectorRequiredOptions; + + // for flink connector + private String connectorOptionalOptions; + /** user name of creator */ private transient String creatorName; @@ -69,4 +80,6 @@ public class Resource implements Serializable { private transient String sortField; private transient String sortOrder; + + private transient String connector; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java index 75571f53a3..458c74d3db 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java @@ -35,7 +35,7 @@ @TableName("t_variable") public class Variable implements Serializable { - private static final long serialVersionUID = -7720746591258904369L; + private static final long serialVersionUID = 1L; @TableId(type = IdType.AUTO) private Long id; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java index cba588c659..8e51c155f6 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java @@ -25,7 +25,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.IService; -import org.springframework.web.multipart.MultipartFile; import java.io.IOException; import java.io.Serializable; @@ -75,8 +74,6 @@ public interface ApplicationService extends IService { Map dashboard(Long teamId); - String upload(MultipartFile file) throws Exception; - /** set the latest to Effective, it will really become the current effective */ void toEffective(Application application); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java index d59df4a59e..4a47819117 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ResourceService.java @@ -18,11 +18,14 @@ package org.apache.streampark.console.core.service; import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.core.entity.Resource; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.IService; +import org.springframework.web.multipart.MultipartFile; +import java.io.IOException; import java.util.List; public interface ResourceService extends IService { @@ -49,7 +52,7 @@ public interface ResourceService extends IService { * * @param resource resource */ - void addResource(Resource resource); + void addResource(Resource resource) throws Exception; /** * @param teamId team id @@ -87,4 +90,8 @@ public interface ResourceService extends IService { * @param targetUserId target user id */ void changeOwnership(Long userId, Long targetUserId); + + String upload(MultipartFile file) throws IOException; + + RestResponse checkResource(Resource resource) throws Exception; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 2a38308cbe..b7c37eee6d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -108,6 +108,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.streampark.console.core.enums.Operation.RELEASE; + @Service @Slf4j @Transactional(propagation = Propagation.SUPPORTS, rollbackFor = Exception.class) @@ -170,8 +172,7 @@ public boolean buildApplication(Long appId, boolean forceBuild) { Application app = applicationService.getById(appId); ApplicationLog applicationLog = new ApplicationLog(); - applicationLog.setOptionName( - org.apache.streampark.console.core.enums.Operation.RELEASE.getValue()); + applicationLog.setOptionName(RELEASE.getValue()); applicationLog.setAppId(app.getId()); applicationLog.setOptionTime(new Date()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 7c9140dc1c..ee2d996d50 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -96,7 +96,6 @@ import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse; import org.apache.commons.io.FileUtils; -import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.CoreOptions; @@ -120,7 +119,6 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; -import org.springframework.web.multipart.MultipartFile; import javax.annotation.Nonnull; import javax.annotation.PostConstruct; @@ -307,24 +305,6 @@ public Map dashboard(Long teamId) { return map; } - @Override - public String upload(MultipartFile file) throws Exception { - File temp = WebUtils.getAppTempDir(); - String fileName = FilenameUtils.getName(Objects.requireNonNull(file.getOriginalFilename())); - File saveFile = new File(temp, fileName); - // delete when exists - if (saveFile.exists()) { - saveFile.delete(); - } - // save file to temp dir - try { - file.transferTo(saveFile); - } catch (Exception e) { - throw new ApiDetailException(e); - } - return saveFile.getAbsolutePath(); - } - @Override public void toEffective(Application application) { // set latest to Effective @@ -861,57 +841,60 @@ public boolean update(Application appParam) { String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(), appParam.getTeamId())); application.setRelease(ReleaseState.NEED_RELEASE.get()); + + // 1) jar job jar file changed if (application.isUploadJob()) { - if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) { + if (!Objects.equals(application.getJar(), appParam.getJar())) { application.setBuild(true); } else { File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar()); if (jarFile.exists()) { - long checkSum = 0; try { - checkSum = FileUtils.checksumCRC32(jarFile); + long checkSum = FileUtils.checksumCRC32(jarFile); + if (!Objects.equals(checkSum, application.getJarCheckSum())) { + application.setBuild(true); + } } catch (IOException e) { log.error("Error in checksumCRC32 for {}.", jarFile); throw new RuntimeException(e); } - if (!ObjectUtils.safeEquals(checkSum, application.getJarCheckSum())) { - application.setBuild(true); - } - } - } - } - - if (!application.getBuild()) { - if (!application.getExecutionMode().equals(appParam.getExecutionMode())) { - if (appParam.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION) - || application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)) { - application.setBuild(true); } } } - if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())) { - if (!ObjectUtils.safeTrimEquals( + // 2) k8s podTemplate changed.. + if (application.getBuild() && ExecutionMode.isKubernetesMode(appParam.getExecutionMode())) { + if (ObjectUtils.trimNoEquals( application.getK8sRestExposedType(), appParam.getK8sRestExposedType()) - || !ObjectUtils.safeTrimEquals( + || ObjectUtils.trimNoEquals( application.getK8sJmPodTemplate(), appParam.getK8sJmPodTemplate()) - || !ObjectUtils.safeTrimEquals( + || ObjectUtils.trimNoEquals( application.getK8sTmPodTemplate(), appParam.getK8sTmPodTemplate()) - || !ObjectUtils.safeTrimEquals( + || ObjectUtils.trimNoEquals( application.getK8sPodTemplates(), appParam.getK8sPodTemplates()) - || !ObjectUtils.safeTrimEquals( + || ObjectUtils.trimNoEquals( application.getK8sHadoopIntegration(), appParam.getK8sHadoopIntegration()) - || !ObjectUtils.safeTrimEquals(application.getFlinkImage(), appParam.getFlinkImage())) { + || ObjectUtils.trimNoEquals(application.getFlinkImage(), appParam.getFlinkImage())) { application.setBuild(true); } } - // when flink version has changed, we should rebuild the application. Otherwise, the shims jar - // may be not suitable for the new flink version. - if (!ObjectUtils.safeEquals(application.getVersionId(), appParam.getVersionId())) { + // 3) flink version changed + if (!application.getBuild() + && !Objects.equals(application.getVersionId(), appParam.getVersionId())) { application.setBuild(true); } + // 4) yarn application mode change + if (!application.getBuild()) { + if (!application.getExecutionMode().equals(appParam.getExecutionMode())) { + if (appParam.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION) + || application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)) { + application.setBuild(true); + } + } + } + appParam.setJobType(application.getJobType()); // changes to the following parameters need to be re-release to take effect application.setJobName(appParam.getJobName()); @@ -959,15 +942,16 @@ public boolean update(Application appParam) { // Flink Sql job... if (application.isFlinkSqlJob()) { updateFlinkSqlJob(application, appParam); + return true; + } + + if (application.isStreamParkJob()) { + configService.update(appParam, application.isRunning()); } else { - if (application.isStreamParkJob()) { - configService.update(appParam, application.isRunning()); - } else { - application.setJar(appParam.getJar()); - application.setMainClass(appParam.getMainClass()); - } + application.setJar(appParam.getJar()); + application.setMainClass(appParam.getMainClass()); } - baseMapper.updateById(application); + this.updateById(application); return true; } @@ -1039,6 +1023,7 @@ private void updateFlinkSqlJob(Application application, Application appParam) { } } } + this.updateById(application); this.configService.update(appParam, application.isRunning()); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java index f702ad0042..de3ec905c5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java @@ -19,11 +19,16 @@ import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.fs.FsOperator; +import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.base.exception.ApiAlertException; +import org.apache.streampark.console.base.exception.ApiDetailException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; +import org.apache.streampark.console.base.util.JacksonUtils; import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.bean.Dependency; +import org.apache.streampark.console.core.bean.FlinkConnectorResource; import org.apache.streampark.console.core.bean.Pom; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.FlinkSql; @@ -34,25 +39,47 @@ import org.apache.streampark.console.core.service.CommonService; import org.apache.streampark.console.core.service.FlinkSqlService; import org.apache.streampark.console.core.service.ResourceService; +import org.apache.streampark.flink.packer.maven.Artifact; +import org.apache.streampark.flink.packer.maven.MavenTool; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.factories.Factory; +import org.apache.hadoop.shaded.org.apache.commons.codec.digest.DigestUtils; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.fasterxml.jackson.core.JsonProcessingException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.multipart.MultipartFile; import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Scanner; +import java.util.ServiceLoader; +import java.util.jar.JarEntry; +import java.util.jar.JarFile; +import java.util.jar.Manifest; import java.util.stream.Collectors; @Slf4j @@ -65,6 +92,8 @@ public class ResourceServiceImpl extends ServiceImpl @Autowired private CommonService commonService; @Autowired private FlinkSqlService flinkSqlService; + public ResourceServiceImpl() {} + @Override public IPage page(Resource resource, RestRequest restRequest) { if (resource.getTeamId() == null) { @@ -86,50 +115,53 @@ public boolean existsByUserId(Long userId) { } @Override - public void addResource(Resource resource) { + public void addResource(Resource resource) throws Exception { String resourceStr = resource.getResource(); ApiAlertException.throwIfNull(resourceStr, "Please add pom or jar resource."); - if (resource.getResourceType() == ResourceType.GROUP) { - ApiAlertException.throwIfNull( - resource.getResourceName(), "The name of resource group is required."); + // check + Dependency dependency = Dependency.toDependency(resourceStr); + List jars = dependency.getJar(); + List poms = dependency.getPom(); + + ApiAlertException.throwIfTrue( + jars.isEmpty() && poms.isEmpty(), "Please add pom or jar resource."); + + ApiAlertException.throwIfTrue( + resource.getResourceType() == ResourceType.FLINK_APP && jars.isEmpty(), + "Please upload jar for Flink_App resource"); + + ApiAlertException.throwIfTrue( + jars.size() + poms.size() > 1, "Please do not add multi dependency at one time."); + + if (resource.getResourceType() != ResourceType.CONNECTOR) { + ApiAlertException.throwIfNull(resource.getResourceName(), "The resourceName is required."); } else { - Dependency dependency = Dependency.toDependency(resourceStr); - List jars = dependency.getJar(); - List poms = dependency.getPom(); - - ApiAlertException.throwIfTrue( - jars.isEmpty() && poms.isEmpty(), "Please add pom or jar resource."); - ApiAlertException.throwIfTrue( - jars.size() + poms.size() > 1, "Please do not add multi dependency at one time."); - ApiAlertException.throwIfTrue( - resource.getResourceType() == ResourceType.FLINK_APP && jars.isEmpty(), - "Please upload jar for Flink_App resource"); - - Long teamId = resource.getTeamId(); - String resourceName = null; - - if (poms.isEmpty()) { - resourceName = jars.get(0); - ApiAlertException.throwIfTrue( - this.findByResourceName(teamId, resourceName) != null, - String.format("Sorry, the resource %s already exists.", resourceName)); - - // copy jar to team upload directory - transferTeamResource(teamId, resourceName); - } else { - Pom pom = poms.get(0); - resourceName = - String.format("%s:%s:%s", pom.getGroupId(), pom.getArtifactId(), pom.getVersion()); - if (StringUtils.isNotBlank(pom.getClassifier())) { - resourceName = resourceName + ":" + pom.getClassifier(); - } - ApiAlertException.throwIfTrue( - this.findByResourceName(teamId, resourceName) != null, - String.format("Sorry, the resource %s already exists.", resourceName)); + String connector = resource.getConnector(); + ApiAlertException.throwIfTrue(connector == null, "the flink connector is null."); + FlinkConnectorResource connectorResource = + JacksonUtils.read(connector, FlinkConnectorResource.class); + resource.setResourceName(connectorResource.getFactoryIdentifier()); + if (connectorResource.getRequiredOptions() != null) { + resource.setConnectorRequiredOptions( + JacksonUtils.write(connectorResource.getRequiredOptions())); } + if (connectorResource.getOptionalOptions() != null) { + resource.setConnectorOptionalOptions( + JacksonUtils.write(connectorResource.getOptionalOptions())); + } + } - resource.setResourceName(resourceName); + ApiAlertException.throwIfTrue( + this.findByResourceName(resource.getTeamId(), resource.getResourceName()) != null, + String.format("Sorry, the resource %s already exists.", resource.getResourceName())); + + if (!jars.isEmpty()) { + String resourcePath = jars.get(0); + resource.setResourcePath(resourcePath); + // copy jar to team upload directory + String upFile = resourcePath.split(":")[1]; + transferTeamResource(resource.getTeamId(), upFile); } resource.setCreatorId(commonService.getUserId()); @@ -155,7 +187,12 @@ public void updateResource(Resource resource) { ApiAlertException.throwIfFalse( resourceName.equals(findResource.getResourceName()), "Please make sure the resource name is not changed."); - transferTeamResource(findResource.getTeamId(), resourceName); + + Dependency dependency = Dependency.toDependency(resource.getResource()); + if (!dependency.getJar().isEmpty()) { + String jarFile = dependency.getJar().get(0).split(":")[1]; + transferTeamResource(findResource.getTeamId(), jarFile); + } } findResource.setDescription(resource.getDescription()); @@ -199,15 +236,234 @@ public void changeOwnership(Long userId, Long targetUserId) { this.baseMapper.update(null, updateWrapper); } - private void transferTeamResource(Long teamId, String resourceName) { + /** + * @param file + * @return + */ + @Override + public String upload(MultipartFile file) throws IOException { + File temp = WebUtils.getAppTempDir(); + + String name = file.getOriginalFilename(); + String suffix = name.substring(name.lastIndexOf(".")); + + String sha256Hex = DigestUtils.sha256Hex(file.getInputStream()); + String fileName = sha256Hex.concat(suffix); + + File saveFile = new File(temp, fileName); + + if (!saveFile.exists()) { + // save file to temp dir + try { + file.transferTo(saveFile); + } catch (Exception e) { + throw new ApiDetailException(e); + } + } + + return saveFile.getAbsolutePath(); + } + + @Override + public RestResponse checkResource(Resource resourceParam) throws JsonProcessingException { + ResourceType type = resourceParam.getResourceType(); + Map resp = new HashMap<>(0); + resp.put("state", 0); + switch (type) { + case FLINK_APP: + // check main. + File jarFile; + try { + jarFile = getResourceJar(resourceParam); + } catch (Exception e) { + // get jarFile error + resp.put("state", 1); + resp.put("exception", Utils.stringifyException(e)); + return RestResponse.success().data(resp); + } + Manifest manifest = Utils.getJarManifest(jarFile); + String mainClass = manifest.getMainAttributes().getValue("Main-Class"); + + if (mainClass == null) { + // main class is null + resp.put("state", 2); + return RestResponse.success().data(resp); + } + return RestResponse.success().data(resp); + case CONNECTOR: + // 1) get connector id + FlinkConnectorResource connectorResource; + + ApiAlertException.throwIfFalse( + ResourceType.CONNECTOR.equals(resourceParam.getResourceType()), + "getConnectorId method error, resource not flink connector."); + + List jars; + File connector = null; + List factories; + + Dependency dependency = Dependency.toDependency(resourceParam.getResource()); + + // 1) get connector jar + if (!dependency.getPom().isEmpty()) { + Artifact artifact = dependency.toArtifact().get(0); + try { + jars = MavenTool.resolveArtifacts(artifact); + } catch (Exception e) { + // connector download is null + resp.put("state", 1); + resp.put("exception", Utils.stringifyException(e)); + return RestResponse.success().data(resp); + } + String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version()); + Optional file = jars.stream().filter(x -> x.getName().equals(fileName)).findFirst(); + if (file.isPresent()) { + connector = file.get(); + } + } else { + // 2) jar + String jar = dependency.getJar().get(0).split(":")[1]; + File file = new File(jar); + connector = file; + jars = Collections.singletonList(file); + } + + // 2) parse connector Factory + try { + factories = getConnectorFactory(connector); + } catch (Exception e) { + // flink connector invalid + resp.put("state", 2); + resp.put("exception", Utils.stringifyException(e)); + return RestResponse.success().data(resp); + } + + // 3) get connector resource + connectorResource = getConnectorResource(jars, factories); + if (connectorResource == null) { + // connector is null + resp.put("state", 3); + return RestResponse.success().data(resp); + } + + // 2) check connector exists + boolean exists = + existsFlinkConnector(resourceParam.getId(), connectorResource.getFactoryIdentifier()); + if (exists) { + resp.put("state", 4); + resp.put("name", connectorResource.getFactoryIdentifier()); + return RestResponse.success(resp); + } + + if (resourceParam.getId() != null) { + Resource resource = getById(resourceParam.getId()); + if (!resource.getResourceName().equals(connectorResource.getFactoryIdentifier())) { + resp.put("state", 5); + return RestResponse.success().data(resp); + } + } + resp.put("state", 0); + resp.put("connector", JacksonUtils.write(connectorResource)); + return RestResponse.success().data(resp); + } + return RestResponse.success().data(resp); + } + + private boolean existsFlinkConnector(Long id, String connectorId) { + LambdaQueryWrapper lambdaQueryWrapper = + new LambdaQueryWrapper().eq(Resource::getResourceName, connectorId); + if (id != null) { + lambdaQueryWrapper.ne(Resource::getId, id); + } + return getBaseMapper().exists(lambdaQueryWrapper); + } + + private FlinkConnectorResource getConnectorResource(List jars, List factories) { + Class className = Factory.class; + URL[] array = + jars.stream() + .map( + x -> { + try { + return x.toURI().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + }) + .toArray(URL[]::new); + + try (URLClassLoader urlClassLoader = URLClassLoader.newInstance(array)) { + ServiceLoader serviceLoader = ServiceLoader.load(className, urlClassLoader); + for (Factory factory : serviceLoader) { + String factoryClassName = factory.getClass().getName(); + if (factories.contains(factoryClassName)) { + FlinkConnectorResource connectorResource = new FlinkConnectorResource(); + try { + connectorResource.setClassName(factoryClassName); + connectorResource.setFactoryIdentifier(factory.factoryIdentifier()); + } catch (Exception ignored) { + } + + try { + Map requiredOptions = new HashMap<>(0); + factory + .requiredOptions() + .forEach(x -> requiredOptions.put(x.key(), getOptionDefaultValue(x))); + connectorResource.setRequiredOptions(requiredOptions); + } catch (Exception ignored) { + + } + + try { + Map optionalOptions = new HashMap<>(0); + factory + .optionalOptions() + .forEach(x -> optionalOptions.put(x.key(), getOptionDefaultValue(x))); + connectorResource.setOptionalOptions(optionalOptions); + } catch (Exception ignored) { + } + return connectorResource; + } + } + return null; + } catch (Exception e) { + log.error("getConnectorResource failed. " + e); + } + return null; + } + + private File getResourceJar(Resource resource) throws Exception { + Dependency dependency = Dependency.toDependency(resource.getResource()); + if (dependency.isEmpty()) { + return null; + } + if (!dependency.getJar().isEmpty()) { + String jar = dependency.getJar().get(0).split(":")[1]; + return new File(jar); + } else { + Artifact artifact = dependency.toArtifact().get(0); + List files = MavenTool.resolveArtifacts(artifact); + if (!files.isEmpty()) { + String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version()); + Optional jarFile = + files.stream().filter(x -> x.getName().equals(fileName)).findFirst(); + if (jarFile.isPresent()) { + return jarFile.get(); + } + } + return null; + } + } + + private void transferTeamResource(Long teamId, String resourcePath) { String teamUploads = String.format("%s/%d", Workspace.local().APP_UPLOADS(), teamId); if (!FsOperator.lfs().exists(teamUploads)) { FsOperator.lfs().mkdirs(teamUploads); } - File localJar = new File(WebUtils.getAppTempDir(), resourceName); - File teamUploadJar = new File(teamUploads, resourceName); + File localJar = new File(resourcePath); + File teamUploadJar = new File(teamUploads, localJar.getName()); ApiAlertException.throwIfFalse( - localJar.exists(), "Missing file: " + resourceName + ", please upload again"); + localJar.exists(), "Missing file: " + resourcePath + ", please upload again"); FsOperator.lfs() .upload(localJar.getAbsolutePath(), teamUploadJar.getAbsolutePath(), false, true); } @@ -246,4 +502,36 @@ private List getResourceApplicationsById(Resource resource) { return dependApplications; } + + private List getConnectorFactory(File connector) throws Exception { + String configFile = "META-INF/services/org.apache.flink.table.factories.Factory"; + JarFile jarFile = new JarFile(connector); + JarEntry entry = jarFile.getJarEntry(configFile); + if (entry == null) { + throw new IllegalArgumentException("invalid flink connector"); + } + List factories = new ArrayList<>(0); + try (InputStream inputStream = jarFile.getInputStream(entry)) { + Scanner scanner = new Scanner(new InputStreamReader(inputStream)); + while (scanner.hasNextLine()) { + String line = scanner.nextLine().trim(); + if (line.length() > 0 && !line.startsWith("#")) { + factories.add(line); + } + } + scanner.close(); + } + return factories; + } + + private String getOptionDefaultValue(ConfigOption option) { + if (!option.hasDefaultValue()) { + return null; + } + Object value = option.defaultValue(); + if (value instanceof Duration) { + return value.toString().replace("PT", "").toLowerCase(); + } + return value.toString(); + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java index b035253f23..f04adad00f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTToken.java @@ -25,7 +25,7 @@ @Data public class JWTToken implements AuthenticationToken { - private static final long serialVersionUID = 1282057025599826155L; + private static final long serialVersionUID = 1L; private String token; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java index cc89f609dc..901b61153d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/AccessToken.java @@ -32,7 +32,7 @@ @TableName("t_access_token") public class AccessToken implements Serializable { - private static final long serialVersionUID = 7187628714679791772L; + private static final long serialVersionUID = 1L; public static final String DEFAULT_EXPIRE_TIME = "9999-01-01 00:00:00"; public static final String IS_API_TOKEN = "is_api_token"; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Member.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Member.java index d3e7e029b3..6c85a3b968 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Member.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Member.java @@ -29,7 +29,7 @@ @Data public class Member implements Serializable { - private static final long serialVersionUID = -3166012934498268403L; + private static final long serialVersionUID = 1L; @TableId(type = IdType.AUTO) private Long id; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Menu.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Menu.java index 61533daa2f..17d9e914eb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Menu.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Menu.java @@ -32,7 +32,7 @@ @TableName("t_menu") public class Menu implements Serializable { - private static final long serialVersionUID = 7187628714679791771L; + private static final long serialVersionUID = 1L; public static final String TYPE_MENU = "0"; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Role.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Role.java index 16d853b305..ed412d340f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Role.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Role.java @@ -32,7 +32,7 @@ @TableName("t_role") public class Role implements Serializable { - private static final long serialVersionUID = -1714476694755654924L; + private static final long serialVersionUID = 1L; @TableId(type = IdType.AUTO) private Long roleId; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java index e2f28c6b2c..5658e9c2a5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/RoleMenu.java @@ -28,7 +28,7 @@ @Data public class RoleMenu implements Serializable { - private static final long serialVersionUID = -7573904024872252113L; + private static final long serialVersionUID = 1L; @TableId(type = IdType.AUTO) private Long id; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/SysLog.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/SysLog.java index cdfd515723..cdf029eda3 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/SysLog.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/SysLog.java @@ -29,7 +29,7 @@ @TableName("t_log") public class SysLog implements Serializable { - private static final long serialVersionUID = -8878596941954995444L; + private static final long serialVersionUID = 1L; @TableId(type = IdType.AUTO) private Long id; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Team.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Team.java index 415fb43e01..8d7bca18ef 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Team.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/Team.java @@ -31,7 +31,7 @@ @TableName("t_team") public class Team implements Serializable { - private static final long serialVersionUID = -1714476694755654924L; + private static final long serialVersionUID = 1L; @TableId(type = IdType.AUTO) private Long id; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java index 4b3fc33750..7a328a1f9d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java @@ -37,7 +37,7 @@ @TableName("t_user") public class User implements Serializable { - private static final long serialVersionUID = -4852732617765810959L; + private static final long serialVersionUID = 1L; /** user status */ public static final String STATUS_VALID = "1"; diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql index fcb05c25e8..16b6503f19 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql @@ -220,11 +220,14 @@ create table if not exists `t_resource` ( `id` bigint generated by default as identity not null, `resource_name` varchar(128) not null comment 'The name of the resource file', `resource_type` int not null comment '0:app 1:common 2:connector 3:format 4:udf', + `resource_path` varchar(255) default null, `resource` text , `engine_type` int not null comment 'compute engine type, 0:apache flink 1:apache spark', `main_class` varchar(255) default null, `description` text default null comment 'More detailed description of resource', `creator_id` bigint not null comment 'user id of creator', + `connector_required_options` text default null, + `connector_optional_options` text default null, `team_id` bigint not null comment 'team id', `create_time` datetime not null default current_timestamp comment 'create time', `modify_time` datetime not null default current_timestamp on update current_timestamp comment 'modify time', diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ResourceMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ResourceMapper.xml index c8020991a2..21228ca977 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ResourceMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ResourceMapper.xml @@ -20,11 +20,14 @@ + + + diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java new file mode 100644 index 0000000000..10dc40ccb4 --- /dev/null +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/DependencyUtilsTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.base.util; + +import org.apache.streampark.common.conf.CommonConfig; +import org.apache.streampark.common.conf.InternalConfigHolder; +import org.apache.streampark.console.core.bean.FlinkConnectorResource; +import org.apache.streampark.flink.packer.maven.Artifact; +import org.apache.streampark.flink.packer.maven.MavenTool; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.factories.Factory; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Scanner; +import java.util.ServiceLoader; +import java.util.jar.JarEntry; +import java.util.jar.JarFile; + +@Slf4j +class DependencyUtilsTest { + + @Test + public void resolveFlinkConnector() throws Exception { + + Artifact artifact = new Artifact("com.ververica", "flink-connector-mysql-cdc", "2.4.1", null); + + InternalConfigHolder.set(CommonConfig.STREAMPARK_WORKSPACE_LOCAL(), "~/tmp"); + + List files = MavenTool.resolveArtifacts(artifact); + if (files.isEmpty()) { + return; + } + + String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version()); + Optional jarFile = files.stream().filter(x -> x.getName().equals(fileName)).findFirst(); + File connector = jarFile.get(); + + List factories = getConnectorFactory(connector); + + Class className = Factory.class; + URL[] array = + files.stream() + .map( + x -> { + try { + return x.toURI().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + }) + .toArray(URL[]::new); + + URLClassLoader urlClassLoader = URLClassLoader.newInstance(array); + ServiceLoader serviceLoader = ServiceLoader.load(className, urlClassLoader); + + List connectorResources = new ArrayList<>(); + try { + for (Factory factory : serviceLoader) { + String factoryClassName = factory.getClass().getName(); + if (factories.contains(factoryClassName)) { + FlinkConnectorResource connectorResource = new FlinkConnectorResource(); + connectorResource.setClassName(factoryClassName); + connectorResource.setFactoryIdentifier(factory.factoryIdentifier()); + Map requiredOptions = new HashMap<>(0); + factory + .requiredOptions() + .forEach(x -> requiredOptions.put(x.key(), getOptionDefaultValue(x))); + connectorResource.setRequiredOptions(requiredOptions); + + Map optionalOptions = new HashMap<>(0); + factory + .optionalOptions() + .forEach(x -> optionalOptions.put(x.key(), getOptionDefaultValue(x))); + connectorResource.setOptionalOptions(optionalOptions); + + connectorResources.add(connectorResource); + } + } + } catch (Throwable e) { + e.printStackTrace(); + } + urlClassLoader.close(); + System.out.println(connectorResources); + } + + private String getOptionDefaultValue(ConfigOption option) { + if (!option.hasDefaultValue()) { + return null; + } + Object value = option.defaultValue(); + if (value instanceof Duration) { + return value.toString().replace("PT", "").toLowerCase(); + } + return value.toString(); + } + + @Test + public void testDuration() { + String s = "PT30H"; + Duration duration = Duration.parse(s); + System.out.println(duration.getSeconds()); + } + + private List getConnectorFactory(File connector) throws Exception { + String configFile = "META-INF/services/org.apache.flink.table.factories.Factory"; + JarFile jarFile = new JarFile(connector); + JarEntry entry = jarFile.getJarEntry(configFile); + List factories = new ArrayList<>(0); + try (InputStream inputStream = jarFile.getInputStream(entry)) { + Scanner scanner = new Scanner(new InputStreamReader(inputStream)); + while (scanner.hasNextLine()) { + String line = scanner.nextLine().trim(); + if (line.length() > 0 && !line.startsWith("#")) { + factories.add(line); + } + } + scanner.close(); + } + return factories; + } +} diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java index aae093e7f8..ea5c345602 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ApplicationServiceTest.java @@ -23,22 +23,13 @@ import org.apache.streampark.console.core.entity.YarnQueue; import org.apache.streampark.console.core.service.impl.ApplicationServiceImpl; -import org.apache.hc.core5.http.ContentType; - import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import org.h2.store.fs.FileUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.mock.web.MockMultipartFile; -import org.springframework.web.multipart.MultipartFile; -import java.io.File; -import java.io.FileInputStream; -import java.nio.file.Path; import java.util.Date; import static org.assertj.core.api.Assertions.assertThat; @@ -105,26 +96,6 @@ void testStart() throws Exception { applicationService.start(application, false); } - @Test - void testUpload(@TempDir Path tempDir) throws Exception { - // specify the file path - File fileToStoreUploadFile = - new File(tempDir.toFile().getAbsolutePath() + "/fileToStoreUploadFile"); - FileUtils.createFile(fileToStoreUploadFile.getAbsolutePath()); - - File fileToUpload = new File(tempDir.toFile().getAbsolutePath() + "/fileToUpload.jar"); - FileUtils.createFile(fileToUpload.getAbsolutePath()); - assertThat(fileToUpload).exists(); - MultipartFile mulFile = - new MockMultipartFile( - "test", // fileName (eg: streampark.jar) - fileToUpload.getAbsolutePath(), // originalFilename (eg: path + fileName = - // /tmp/file/streampark.jar) - ContentType.APPLICATION_OCTET_STREAM.toString(), - new FileInputStream(fileToStoreUploadFile)); - applicationService.upload(mulFile); - } - @Test void testCheckQueueValidationIfNeeded() { ApplicationServiceImpl applicationServiceImpl = (ApplicationServiceImpl) applicationService; diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java new file mode 100644 index 0000000000..e6c00ec717 --- /dev/null +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/ResourceServiceTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +import org.apache.streampark.console.SpringTestBase; + +import org.apache.hc.core5.http.ContentType; + +import org.h2.store.fs.FileUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.mock.web.MockMultipartFile; +import org.springframework.web.multipart.MultipartFile; + +import java.io.File; +import java.io.FileInputStream; +import java.nio.file.Path; + +import static org.assertj.core.api.Assertions.assertThat; + +/** org.apache.streampark.console.core.service.ResourceServiceTest. */ +class ResourceServiceTest extends SpringTestBase { + + @Autowired private ResourceService resourceService; + + @Test + void testUpload(@TempDir Path tempDir) throws Exception { + // specify the file path + File fileToStoreUploadFile = + new File(tempDir.toFile().getAbsolutePath() + "/fileToStoreUploadFile"); + FileUtils.createFile(fileToStoreUploadFile.getAbsolutePath()); + + File fileToUpload = new File(tempDir.toFile().getAbsolutePath() + "/fileToUpload.jar"); + FileUtils.createFile(fileToUpload.getAbsolutePath()); + assertThat(fileToUpload).exists(); + MultipartFile mulFile = + new MockMultipartFile( + "test", // fileName (eg: streampark.jar) + fileToUpload.getAbsolutePath(), // originalFilename (eg: path + fileName = + // /tmp/file/streampark.jar) + ContentType.APPLICATION_OCTET_STREAM.toString(), + new FileInputStream(fileToStoreUploadFile)); + resourceService.upload(mulFile); + } +} diff --git a/streampark-console/streampark-console-webapp/src/api/flink/resource/index.ts b/streampark-console/streampark-console-webapp/src/api/flink/resource/index.ts index 1b279c7a6a..a0d0452847 100644 --- a/streampark-console/streampark-console-webapp/src/api/flink/resource/index.ts +++ b/streampark-console/streampark-console-webapp/src/api/flink/resource/index.ts @@ -23,13 +23,16 @@ import { ResourceListRecord, ResourceParam, } from './model/resourceModel'; +import { ContentTypeEnum } from '/@/enums/httpEnum'; enum RESOURCE_API { PAGE = '/resource/page', POST = '/resource/add', UPDATE = '/resource/update', + UPLOAD = '/resource/upload', DELETE = '/resource/delete', LIST = '/resource/list', + CHECK = '/resource/check', } /** @@ -76,3 +79,18 @@ export function fetchResourceDelete(data: ResourceDeleteParam): Promise { return defHttp.post({ url: RESOURCE_API.LIST, data }); } + +export function checkResource(data: ResourceParam): Promise> { + return defHttp.post({ url: RESOURCE_API.CHECK, data }); +} + +export function fetchUpload(params) { + return defHttp.post({ + url: RESOURCE_API.UPLOAD, + params, + headers: { + 'Content-Type': ContentTypeEnum.FORM_DATA, + }, + timeout: 1000 * 60 * 10, // Uploading files timed out for 10 minutes + }); +} diff --git a/streampark-console/streampark-console-webapp/src/api/flink/resource/model/resourceModel.ts b/streampark-console/streampark-console-webapp/src/api/flink/resource/model/resourceModel.ts index 4de0f0f5e8..3c720d38a0 100644 --- a/streampark-console/streampark-console-webapp/src/api/flink/resource/model/resourceModel.ts +++ b/streampark-console/streampark-console-webapp/src/api/flink/resource/model/resourceModel.ts @@ -34,6 +34,7 @@ export interface ResourceListRecord { export interface ResourceParam { id?: string; resourceName: string; + connector?: string; engineType: string; description: string; } diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/resource.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/resource.ts index 19ffd1618f..7e031498ca 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/resource.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/resource.ts @@ -21,24 +21,32 @@ export default { deleteResource: 'Delete Resource', deletePopConfirm: 'Are you sure delete this resource ?', uploadResource: 'Upload Resource', + resourceName: 'Resource Name', resourceType: 'Resource Type', engineType: 'Engine Type', resourceGroup: 'Resource Group', groupName: 'Group Name', + resourceNamePlaceholder: 'Please input resource name', engineTypePlaceholder: 'Please select compute engine type', resourceGroupPlaceholder: 'Please choose resource', groupNamePlaceholder: 'Please input the group name', groupNameIsRequiredMessage: 'Group Name is required', multiPomTip: 'Do not add multiple dependencies at one time', addResourceTip: 'Please add a resource', + jarFileErrorTip: 'Jar file is null, please try again', + mainNullTip: 'Flink app invalid, main class is null', + connectorExistsTip: 'this connector already exists', + connectorInvalidTip: 'flink connector invalid, please check', + connectorInfoErrorTip: 'get flink connector information error.', + connectorModifyTip: 'this connector cannot be modified, because factoryIdentifier has changed', add: 'Add', success: ' successful', fail: ' failed', table: { title: 'Resource List', resourceName: 'Resource Name', - resourceNamePlaceholder: 'Please enter the resource name to search', - descriptionPlaceholder: 'Please enter description to search', + resourceNamePlaceholder: 'Please enter the resource name', + descriptionPlaceholder: 'Please enter description', createUser: 'Create User', createTime: 'Create Time', modifyTime: 'Modify Time', @@ -46,6 +54,7 @@ export default { }, form: { descriptionMessage: 'exceeds maximum length limit of 100 characters', + resourceNameIsRequiredMessage: 'resource name is required', engineTypeIsRequiredMessage: 'compute engine type is required', resourceTypeIsRequiredMessage: 'resource type is required', resourceTypePlaceholder: 'Please select resource type', diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/resource.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/resource.ts index 023d0f36b9..5b0dc99e5a 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/resource.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/resource.ts @@ -21,16 +21,24 @@ export default { deleteResource: '删除资源', deletePopConfirm: '你确定要删除这个资源?', uploadResource: '上传资源', + resourceName: '资源名称', resourceType: '资源类型', engineType: '计算引擎类型', resourceGroup: '资源组', groupName: '资源组名称', resourceGroupPlaceholder: '请选择组资源', + resourceNamePlaceholder: '请输入资源名称', groupNamePlaceholder: '请输入资源组名称', groupNameIsRequiredMessage: '资源组名称必填', engineTypePlaceholder: '请选择计算引擎类型', multiPomTip: '不支持同时添加多个依赖', addResourceTip: '请添加资源', + jarFileErrorTip: 'Jar 文件为空,请重试', + mainNullTip: 'Flink app 无效,主类为空', + connectorInvalidTip: '该连接器无效,请检查', + connectorExistsTip: '该连接器已经存在', + connectorModifyTip: '该连接器无法修改,factoryIdentifier 不能更改', + connectorInfoErrorTip: '获取改连接器信息出错', add: '添加', success: '成功', fail: '失败', @@ -46,8 +54,9 @@ export default { }, form: { descriptionMessage: '超过 100 个字符的最大长度限制', - engineTypeIsRequiredMessage: '计算引擎类型必选', - resourceTypeIsRequiredMessage: '资源类型必选', + resourceNameIsRequiredMessage: '资源名称为必填项', + engineTypeIsRequiredMessage: '计算引擎类型为必填项', + resourceTypeIsRequiredMessage: '资源类型为必填项', resourceTypePlaceholder: '请选择资源类型', exists: '资源已存在', empty: '资源不能为空', diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less b/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less index b6acf41cd8..a45c73ed2f 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/styles/Add.less @@ -186,27 +186,26 @@ } .dependency-box { - margin-right: 10px; margin-top: 15px; - margin-bottom: -10px; + width: 100%; border-radius: 5px; - background-color: @background-color-base; + background-color: #e6f4ff; + border: 1px solid #91caff; display: inline-block; + margin-bottom: -25px; .dependency-item { position: relative; border: unset; + background-color: unset; line-height: 35px; padding: 0 6px; width: unset; float: left; - margin: 2px 4px 0; - .ant-alert-close-icon { position: relative; left: 5px; top: 6px; - color: @background-color-base; } } } @@ -267,8 +266,14 @@ } [data-theme='dark'] { - .app_controller .icon-close { - color: #ffffff73; + .app_controller { + .icon-close { + color: #ffffff73; + } + .dependency-box { + background-color: #111b26; + border: 1px solid #153450; + } } } diff --git a/streampark-console/streampark-console-webapp/src/views/flink/resource/View.vue b/streampark-console/streampark-console-webapp/src/views/flink/resource/View.vue index 0cae4e79e3..bdb7ae75ac 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/resource/View.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/resource/View.vue @@ -26,45 +26,48 @@