Skip to content

Commit

Permalink
[Improve] resource improvement (#2860)
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys authored Aug 6, 2023
1 parent b2ec2cd commit 279e2d6
Show file tree
Hide file tree
Showing 54 changed files with 1,035 additions and 373 deletions.
2 changes: 2 additions & 0 deletions streampark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
<optional>true</optional>
</dependency>


<!--logback -->
<dependency>
<groupId>org.apache.streampark</groupId>
Expand All @@ -128,6 +129,7 @@
<version>${streampark.shaded.version}</version>
</dependency>


<!-- ZIO -->
<dependency>
<groupId>dev.zio</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,16 @@ object ConfigConst {
val KEY_SPARK_BATCH_DURATION = "spark.batch.duration"

// flink
def KEY_APP_CONF(prefix: String = null): String = if (prefix == null) "conf" else s"${prefix}conf"
def KEY_APP_CONF(prefix: String = null): String = s"${Option(prefix).getOrElse("")}conf"

def KEY_FLINK_CONF(prefix: String = null): String =
if (prefix == null) "flink.conf" else s"${prefix}flink.conf"
def KEY_FLINK_CONF(prefix: String = null): String = s"${Option(prefix).getOrElse("")}flink.conf"

def KEY_APP_NAME(prefix: String = null): String =
if (prefix == null) "app.name" else s"${prefix}app.name"
def KEY_APP_NAME(prefix: String = null): String = s"${Option(prefix).getOrElse("")}app.name"

def KEY_FLINK_SQL(prefix: String = null): String = if (prefix == null) "sql" else s"${prefix}sql"
def KEY_FLINK_SQL(prefix: String = null): String = s"${Option(prefix).getOrElse("")}sql"

def KEY_FLINK_PARALLELISM(prefix: String = null): String =
if (prefix == null) "parallelism.default" else s"${prefix}parallelism.default"
s"${Option(prefix).getOrElse("")}parallelism.default"

val KEY_FLINK_OPTION_PREFIX = "flink.option."

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package org.apache.streampark.common.util

import java.io.File
import java.io.{File, IOException}
import java.net.{URL, URLClassLoader}
import java.util.function.Supplier
import java.util.function.{Consumer, Supplier}

import scala.collection.mutable.ArrayBuffer

object ClassLoaderUtils extends Logger {

Expand Down Expand Up @@ -61,6 +63,15 @@ object ClassLoaderUtils extends Logger {
Thread.currentThread.setContextClassLoader(originalClassLoader)
}
}
@throws[IOException]
def cloneClassLoader(): ClassLoader = {
val urls = originalClassLoader.getResources(".")
val buffer = ArrayBuffer[URL]()
while (urls.hasMoreElements) {
buffer += urls.nextElement()
}
new URLClassLoader(buffer.toArray[URL], originalClassLoader)
}

def loadJar(jarFilePath: String): Unit = {
val jarFile = new File(jarFilePath)
Expand Down
9 changes: 8 additions & 1 deletion streampark-console/streampark-console-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@
<version>${project.version}</version>
</dependency>


<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -436,11 +437,17 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-logging_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-streams_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,4 +531,27 @@ create table `t_yarn_queue` (
unique key `unq_team_id_queue_label` (`team_id`, `queue_label`) using btree
) engine = innodb default charset = utf8mb4 collate = utf8mb4_general_ci;



-- ----------------------------
-- Table of t_resource
-- ----------------------------
drop table if exists `t_resource`;
create table if not exists `t_resource` (
`id` bigint not null auto_increment primary key,
`resource_name` varchar(128) not null comment 'The name of the resource',
`resource_type` int not null comment '0:app 1:common 2:connector 3:format 4:udf',
`resource_path` varchar(255) default null,
`resource` text,
`engine_type` int not null comment 'compute engine type, 0:apache flink 1:apache spark',
`main_class` varchar(255) default null,
`description` text default null comment 'More detailed description of resource',
`creator_id` bigint not null comment 'user id of creator',
`connector_required_options` text default null,
`connector_optional_options` text default null,
`team_id` bigint not null comment 'team id',
`create_time` datetime not null default current_timestamp comment 'create time',
`modify_time` datetime not null default current_timestamp on update current_timestamp comment 'modify time'
);

set foreign_key_checks = 1;
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ alter table `t_user` modify column `login_type` tinyint default 0 comment 'login
-- ----------------------------
drop table if exists `t_flink_gateway`;
create table `t_flink_gateway` (
`id` bigint not null auto_increment,
`gateway_name` varchar(128) collate utf8mb4_general_ci not null comment 'The name of the gateway',
`description` text collate utf8mb4_general_ci default null comment 'More detailed description of resource',
`gateway_type` int not null comment 'The type of the gateway',
`address` varchar(150) default null comment 'url address of gateway endpoint',
`create_time` datetime not null default current_timestamp comment 'create time',
`modify_time` datetime not null default current_timestamp on update current_timestamp comment 'modify time',
primary key (`id`) using btree
`id` bigint not null auto_increment,
`gateway_name` varchar(128) collate utf8mb4_general_ci not null comment 'The name of the gateway',
`description` text collate utf8mb4_general_ci default null comment 'More detailed description of resource',
`gateway_type` int not null comment 'The type of the gateway',
`address` varchar(150) default null comment 'url address of gateway endpoint',
`create_time` datetime not null default current_timestamp comment 'create time',
`modify_time` datetime not null default current_timestamp on update current_timestamp comment 'modify time',
primary key (`id`) using btree
) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci;

-- menu level 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
@Data
public class RestRequest implements Serializable {

private static final long serialVersionUID = -4869594085374385813L;
private static final long serialVersionUID = 1L;

@Schema(example = "10", required = true)
private int pageSize = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class RestResponse extends HashMap<String, Object> {
public static final String STATUS_SUCCESS = "success";
public static final String STATUS_FAIL = "error";

private static final long serialVersionUID = -8713837118340960775L;
private static final long serialVersionUID = 1L;

public static RestResponse success(Object data) {
RestResponse resp = new RestResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
public class RouterMeta implements Serializable {

private static final long serialVersionUID = 5499925008927195914L;
private static final long serialVersionUID = 1L;

private Boolean closeable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
public class VueRouter<T> implements Serializable {

private static final long serialVersionUID = -3327478146308500708L;
private static final long serialVersionUID = 1L;

@JsonIgnore private String id;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public final class CommonUtils implements Serializable {

private CommonUtils() {}

private static final long serialVersionUID = 6458428317155311192L;
private static final long serialVersionUID = 1L;

private static final String OS = System.getProperty("os.name").toLowerCase();

Expand Down Expand Up @@ -199,7 +199,7 @@ public static boolean contains(Iterator iterator, Object element) {
if (iterator != null) {
while (iterator.hasNext()) {
Object candidate = iterator.next();
if (ObjectUtils.safeEquals(candidate, element)) {
if (ObjectUtils.equals(candidate, element)) {
return true;
}
}
Expand All @@ -218,7 +218,7 @@ public static boolean contains(Enumeration enumeration, Object element) {
if (enumeration != null) {
while (enumeration.hasMoreElements()) {
Object candidate = enumeration.nextElement();
if (ObjectUtils.safeEquals(candidate, element)) {
if (ObjectUtils.equals(candidate, element)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.streampark.common.util.DateUtils;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
Expand All @@ -41,7 +40,6 @@ private JacksonUtils() {}
MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
MAPPER.setDateFormat(new SimpleDateFormat(DateUtils.fullFormat()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static boolean containsElement(Object[] array, Object element) {
return false;
}
for (Object arrayEle : array) {
if (safeEquals(arrayEle, element)) {
if (equals(arrayEle, element)) {
return true;
}
}
Expand Down Expand Up @@ -238,7 +238,7 @@ public static Object[] toObjectArray(Object source) {
* @return whether the given objects are equal
* @see Arrays#equals
*/
public static boolean safeEquals(Object o1, Object o2) {
public static boolean equals(Object o1, Object o2) {
if (o1 == null || o2 == null) {
return false;
}
Expand Down Expand Up @@ -282,8 +282,8 @@ public static boolean safeEquals(Object o1, Object o2) {
return false;
}

public static boolean safeTrimEquals(Object o1, Object o2) {
boolean equals = safeEquals(o1, o2);
public static boolean trimEquals(Object o1, Object o2) {
boolean equals = equals(o1, o2);
if (!equals) {
if (o1 != null && o2 != null) {
if (o1 instanceof String && o2 instanceof String) {
Expand All @@ -294,6 +294,10 @@ public static boolean safeTrimEquals(Object o1, Object o2) {
return equals;
}

public static boolean trimNoEquals(Object o1, Object o2) {
return !trimEquals(o1, o2);
}

/**
* Return as hash code for the given object; typically the value of <code>
* {@link Object#hashCode()}</code>. If the object is an array, this method will delegate to any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public boolean isEmpty() {
return pom.isEmpty() && jar.isEmpty();
}

public boolean eq(Dependency other) {
public boolean equals(Dependency other) {
if (other == null) {
return false;
}
Expand All @@ -76,20 +76,20 @@ public boolean eq(Dependency other) {
}

public DependencyInfo toJarPackDeps() {
List<Artifact> mvnArts =
this.pom.stream()
.map(
pom ->
new Artifact(
pom.getGroupId(),
pom.getArtifactId(),
pom.getVersion(),
pom.getClassifier()))
.collect(Collectors.toList());
List<Artifact> mvnArts = toArtifact();
List<String> extJars =
this.jar.stream()
.map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar)
.collect(Collectors.toList());
return new DependencyInfo(mvnArts, extJars);
}

public List<Artifact> toArtifact() {
return this.pom.stream()
.map(
pom ->
new Artifact(
pom.getGroupId(), pom.getArtifactId(), pom.getVersion(), pom.getClassifier()))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.bean;

import lombok.Data;

import java.util.Map;

@Data
public class FlinkConnectorResource {
private String className;
private String factoryIdentifier;
Map<String, String> requiredOptions;
Map<String, String> optionalOptions;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ public int hashCode() {

@Override
public String toString() {
return groupId + ":" + artifactId + ":" + version + getClassifier(":");
}

private String getClassifier(String joiner) {
return StringUtils.isEmpty(classifier) ? "" : joiner + classifier;
return String.format(
"%s:%s:%s%s",
groupId,
artifactId,
version,
StringUtils.isEmpty(classifier) ? "" : ":".concat(classifier));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.streampark.console.core.service.ApplicationBackUpService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.flink.packer.pipeline.PipelineStatus;

import org.apache.shiro.authz.annotation.RequiresPermissions;
Expand Down Expand Up @@ -80,6 +81,8 @@ public class ApplicationController {

@Autowired private AppBuildPipeService appBuildPipeService;

@Autowired private ResourceService resourceService;

@Operation(summary = "Get application")
@ApiAccess
@PostMapping("get")
Expand Down Expand Up @@ -397,7 +400,7 @@ public RestResponse checkjar(String jar) {
@PostMapping("upload")
@RequiresPermissions("app:create")
public RestResponse upload(MultipartFile file) throws Exception {
String uploadPath = applicationService.upload(file);
String uploadPath = resourceService.upload(file);
return RestResponse.success(uploadPath);
}

Expand Down
Loading

0 comments on commit 279e2d6

Please sign in to comment.