Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] resource improvement #2860

Merged
merged 22 commits into from
Aug 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions streampark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
<optional>true</optional>
</dependency>


<!--logback -->
<dependency>
<groupId>org.apache.streampark</groupId>
Expand All @@ -128,6 +129,7 @@
<version>${streampark.shaded.version}</version>
</dependency>


<!-- ZIO -->
<dependency>
<groupId>dev.zio</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,16 @@ object ConfigConst {
val KEY_SPARK_BATCH_DURATION = "spark.batch.duration"

// flink
def KEY_APP_CONF(prefix: String = null): String = if (prefix == null) "conf" else s"${prefix}conf"
def KEY_APP_CONF(prefix: String = null): String = s"${Option(prefix).getOrElse("")}conf"

def KEY_FLINK_CONF(prefix: String = null): String =
if (prefix == null) "flink.conf" else s"${prefix}flink.conf"
def KEY_FLINK_CONF(prefix: String = null): String = s"${Option(prefix).getOrElse("")}flink.conf"

def KEY_APP_NAME(prefix: String = null): String =
if (prefix == null) "app.name" else s"${prefix}app.name"
def KEY_APP_NAME(prefix: String = null): String = s"${Option(prefix).getOrElse("")}app.name"

def KEY_FLINK_SQL(prefix: String = null): String = if (prefix == null) "sql" else s"${prefix}sql"
def KEY_FLINK_SQL(prefix: String = null): String = s"${Option(prefix).getOrElse("")}sql"

def KEY_FLINK_PARALLELISM(prefix: String = null): String =
if (prefix == null) "parallelism.default" else s"${prefix}parallelism.default"
s"${Option(prefix).getOrElse("")}parallelism.default"

val KEY_FLINK_OPTION_PREFIX = "flink.option."

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package org.apache.streampark.common.util

import java.io.File
import java.io.{File, IOException}
import java.net.{URL, URLClassLoader}
import java.util.function.Supplier
import java.util.function.{Consumer, Supplier}

import scala.collection.mutable.ArrayBuffer

object ClassLoaderUtils extends Logger {

Expand Down Expand Up @@ -61,6 +63,15 @@ object ClassLoaderUtils extends Logger {
Thread.currentThread.setContextClassLoader(originalClassLoader)
}
}
@throws[IOException]
def cloneClassLoader(): ClassLoader = {
val urls = originalClassLoader.getResources(".")
val buffer = ArrayBuffer[URL]()
while (urls.hasMoreElements) {
buffer += urls.nextElement()
}
new URLClassLoader(buffer.toArray[URL], originalClassLoader)
}

def loadJar(jarFilePath: String): Unit = {
val jarFile = new File(jarFilePath)
Expand Down
9 changes: 8 additions & 1 deletion streampark-console/streampark-console-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@
<version>${project.version}</version>
</dependency>


<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -436,11 +437,17 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-logging_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-streams_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,4 +531,27 @@ create table `t_yarn_queue` (
unique key `unq_team_id_queue_label` (`team_id`, `queue_label`) using btree
) engine = innodb default charset = utf8mb4 collate = utf8mb4_general_ci;



-- ----------------------------
-- Table of t_resource
-- ----------------------------
drop table if exists `t_resource`;
create table if not exists `t_resource` (
`id` bigint not null auto_increment primary key,
`resource_name` varchar(128) not null comment 'The name of the resource',
`resource_type` int not null comment '0:app 1:common 2:connector 3:format 4:udf',
`resource_path` varchar(255) default null,
`resource` text,
`engine_type` int not null comment 'compute engine type, 0:apache flink 1:apache spark',
`main_class` varchar(255) default null,
`description` text default null comment 'More detailed description of resource',
`creator_id` bigint not null comment 'user id of creator',
`connector_required_options` text default null,
`connector_optional_options` text default null,
`team_id` bigint not null comment 'team id',
`create_time` datetime not null default current_timestamp comment 'create time',
`modify_time` datetime not null default current_timestamp on update current_timestamp comment 'modify time'
);

set foreign_key_checks = 1;
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ alter table `t_user` modify column `login_type` tinyint default 0 comment 'login
-- ----------------------------
drop table if exists `t_flink_gateway`;
create table `t_flink_gateway` (
`id` bigint not null auto_increment,
`gateway_name` varchar(128) collate utf8mb4_general_ci not null comment 'The name of the gateway',
`description` text collate utf8mb4_general_ci default null comment 'More detailed description of resource',
`gateway_type` int not null comment 'The type of the gateway',
`address` varchar(150) default null comment 'url address of gateway endpoint',
`create_time` datetime not null default current_timestamp comment 'create time',
`modify_time` datetime not null default current_timestamp on update current_timestamp comment 'modify time',
primary key (`id`) using btree
`id` bigint not null auto_increment,
`gateway_name` varchar(128) collate utf8mb4_general_ci not null comment 'The name of the gateway',
`description` text collate utf8mb4_general_ci default null comment 'More detailed description of resource',
`gateway_type` int not null comment 'The type of the gateway',
`address` varchar(150) default null comment 'url address of gateway endpoint',
`create_time` datetime not null default current_timestamp comment 'create time',
`modify_time` datetime not null default current_timestamp on update current_timestamp comment 'modify time',
primary key (`id`) using btree
) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci;

-- menu level 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
@Data
public class RestRequest implements Serializable {

private static final long serialVersionUID = -4869594085374385813L;
private static final long serialVersionUID = 1L;

@Schema(example = "10", required = true)
private int pageSize = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class RestResponse extends HashMap<String, Object> {
public static final String STATUS_SUCCESS = "success";
public static final String STATUS_FAIL = "error";

private static final long serialVersionUID = -8713837118340960775L;
private static final long serialVersionUID = 1L;

public static RestResponse success(Object data) {
RestResponse resp = new RestResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
public class RouterMeta implements Serializable {

private static final long serialVersionUID = 5499925008927195914L;
private static final long serialVersionUID = 1L;

private Boolean closeable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
public class VueRouter<T> implements Serializable {

private static final long serialVersionUID = -3327478146308500708L;
private static final long serialVersionUID = 1L;

@JsonIgnore private String id;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public final class CommonUtils implements Serializable {

private CommonUtils() {}

private static final long serialVersionUID = 6458428317155311192L;
private static final long serialVersionUID = 1L;

private static final String OS = System.getProperty("os.name").toLowerCase();

Expand Down Expand Up @@ -199,7 +199,7 @@ public static boolean contains(Iterator iterator, Object element) {
if (iterator != null) {
while (iterator.hasNext()) {
Object candidate = iterator.next();
if (ObjectUtils.safeEquals(candidate, element)) {
if (ObjectUtils.equals(candidate, element)) {
return true;
}
}
Expand All @@ -218,7 +218,7 @@ public static boolean contains(Enumeration enumeration, Object element) {
if (enumeration != null) {
while (enumeration.hasMoreElements()) {
Object candidate = enumeration.nextElement();
if (ObjectUtils.safeEquals(candidate, element)) {
if (ObjectUtils.equals(candidate, element)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.streampark.common.util.DateUtils;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
Expand All @@ -41,7 +40,6 @@ private JacksonUtils() {}
MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
MAPPER.setDateFormat(new SimpleDateFormat(DateUtils.fullFormat()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static boolean containsElement(Object[] array, Object element) {
return false;
}
for (Object arrayEle : array) {
if (safeEquals(arrayEle, element)) {
if (equals(arrayEle, element)) {
return true;
}
}
Expand Down Expand Up @@ -238,7 +238,7 @@ public static Object[] toObjectArray(Object source) {
* @return whether the given objects are equal
* @see Arrays#equals
*/
public static boolean safeEquals(Object o1, Object o2) {
public static boolean equals(Object o1, Object o2) {
if (o1 == null || o2 == null) {
return false;
}
Expand Down Expand Up @@ -282,8 +282,8 @@ public static boolean safeEquals(Object o1, Object o2) {
return false;
}

public static boolean safeTrimEquals(Object o1, Object o2) {
boolean equals = safeEquals(o1, o2);
public static boolean trimEquals(Object o1, Object o2) {
boolean equals = equals(o1, o2);
if (!equals) {
if (o1 != null && o2 != null) {
if (o1 instanceof String && o2 instanceof String) {
Expand All @@ -294,6 +294,10 @@ public static boolean safeTrimEquals(Object o1, Object o2) {
return equals;
}

public static boolean trimNoEquals(Object o1, Object o2) {
return !trimEquals(o1, o2);
}

/**
* Return as hash code for the given object; typically the value of <code>
* {@link Object#hashCode()}</code>. If the object is an array, this method will delegate to any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public boolean isEmpty() {
return pom.isEmpty() && jar.isEmpty();
}

public boolean eq(Dependency other) {
public boolean equals(Dependency other) {
if (other == null) {
return false;
}
Expand All @@ -76,20 +76,20 @@ public boolean eq(Dependency other) {
}

public DependencyInfo toJarPackDeps() {
List<Artifact> mvnArts =
this.pom.stream()
.map(
pom ->
new Artifact(
pom.getGroupId(),
pom.getArtifactId(),
pom.getVersion(),
pom.getClassifier()))
.collect(Collectors.toList());
List<Artifact> mvnArts = toArtifact();
List<String> extJars =
this.jar.stream()
.map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar)
.collect(Collectors.toList());
return new DependencyInfo(mvnArts, extJars);
}

public List<Artifact> toArtifact() {
return this.pom.stream()
.map(
pom ->
new Artifact(
pom.getGroupId(), pom.getArtifactId(), pom.getVersion(), pom.getClassifier()))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.bean;

import lombok.Data;

import java.util.Map;

@Data
public class FlinkConnectorResource {
private String className;
private String factoryIdentifier;
Map<String, String> requiredOptions;
Map<String, String> optionalOptions;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ public int hashCode() {

@Override
public String toString() {
return groupId + ":" + artifactId + ":" + version + getClassifier(":");
}

private String getClassifier(String joiner) {
return StringUtils.isEmpty(classifier) ? "" : joiner + classifier;
return String.format(
"%s:%s:%s%s",
groupId,
artifactId,
version,
StringUtils.isEmpty(classifier) ? "" : ":".concat(classifier));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.streampark.console.core.service.ApplicationBackUpService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.flink.packer.pipeline.PipelineStatus;

import org.apache.shiro.authz.annotation.RequiresPermissions;
Expand Down Expand Up @@ -80,6 +81,8 @@ public class ApplicationController {

@Autowired private AppBuildPipeService appBuildPipeService;

@Autowired private ResourceService resourceService;

@Operation(summary = "Get application")
@ApiAccess
@PostMapping("get")
Expand Down Expand Up @@ -397,7 +400,7 @@ public RestResponse checkjar(String jar) {
@PostMapping("upload")
@RequiresPermissions("app:create")
public RestResponse upload(MultipartFile file) throws Exception {
String uploadPath = applicationService.upload(file);
String uploadPath = resourceService.upload(file);
return RestResponse.success(uploadPath);
}

Expand Down
Loading
Loading