Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] resource improvement #2860

Merged
merged 22 commits into from
Aug 6, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion streampark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
<optional>true</optional>
</dependency>


<!--logback -->
<dependency>
<groupId>org.apache.streampark</groupId>
Expand All @@ -127,7 +128,6 @@
<artifactId>streampark-shaded-jackson</artifactId>
<version>${streampark.shaded.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,16 @@ object ConfigConst {
val KEY_SPARK_BATCH_DURATION = "spark.batch.duration"

// flink
def KEY_APP_CONF(prefix: String = null): String = if (prefix == null) "conf" else s"${prefix}conf"
def KEY_APP_CONF(prefix: String = null): String = s"${Option(prefix).getOrElse("")}conf"

def KEY_FLINK_CONF(prefix: String = null): String =
if (prefix == null) "flink.conf" else s"${prefix}flink.conf"
def KEY_FLINK_CONF(prefix: String = null): String = s"${Option(prefix).getOrElse("")}flink.conf"

def KEY_APP_NAME(prefix: String = null): String =
if (prefix == null) "app.name" else s"${prefix}app.name"
def KEY_APP_NAME(prefix: String = null): String = s"${Option(prefix).getOrElse("")}app.name"

def KEY_FLINK_SQL(prefix: String = null): String = if (prefix == null) "sql" else s"${prefix}sql"
def KEY_FLINK_SQL(prefix: String = null): String = s"${Option(prefix).getOrElse("")}sql"

def KEY_FLINK_PARALLELISM(prefix: String = null): String =
if (prefix == null) "parallelism.default" else s"${prefix}parallelism.default"
s"${Option(prefix).getOrElse("")}parallelism.default"

val KEY_FLINK_OPTION_PREFIX = "flink.option."

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package org.apache.streampark.common.util

import java.io.File
import java.io.{File, IOException}
import java.net.{URL, URLClassLoader}
import java.util.function.Supplier
import java.util.function.{Consumer, Supplier}

import scala.collection.mutable.ArrayBuffer

object ClassLoaderUtils extends Logger {

Expand Down Expand Up @@ -61,6 +63,15 @@ object ClassLoaderUtils extends Logger {
Thread.currentThread.setContextClassLoader(originalClassLoader)
}
}
@throws[IOException]
def cloneClassLoader(): ClassLoader = {
val urls = originalClassLoader.getResources(".")
val buffer = ArrayBuffer[URL]()
while (urls.hasMoreElements) {
buffer += urls.nextElement()
}
new URLClassLoader(buffer.toArray[URL], originalClassLoader)
}

def loadJar(jarFilePath: String): Unit = {
val jarFile = new File(jarFilePath)
Expand Down
8 changes: 8 additions & 0 deletions streampark-console/streampark-console-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@
<version>${project.version}</version>
</dependency>


<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -450,6 +451,13 @@
<artifactId>force-shading</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public static boolean contains(Iterator iterator, Object element) {
if (iterator != null) {
while (iterator.hasNext()) {
Object candidate = iterator.next();
if (ObjectUtils.safeEquals(candidate, element)) {
if (ObjectUtils.equals(candidate, element)) {
return true;
}
}
Expand All @@ -218,7 +218,7 @@ public static boolean contains(Enumeration enumeration, Object element) {
if (enumeration != null) {
while (enumeration.hasMoreElements()) {
Object candidate = enumeration.nextElement();
if (ObjectUtils.safeEquals(candidate, element)) {
if (ObjectUtils.equals(candidate, element)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static boolean containsElement(Object[] array, Object element) {
return false;
}
for (Object arrayEle : array) {
if (safeEquals(arrayEle, element)) {
if (equals(arrayEle, element)) {
return true;
}
}
Expand Down Expand Up @@ -238,7 +238,7 @@ public static Object[] toObjectArray(Object source) {
* @return whether the given objects are equal
* @see Arrays#equals
*/
public static boolean safeEquals(Object o1, Object o2) {
public static boolean equals(Object o1, Object o2) {
if (o1 == null || o2 == null) {
return false;
}
Expand Down Expand Up @@ -282,8 +282,8 @@ public static boolean safeEquals(Object o1, Object o2) {
return false;
}

public static boolean safeTrimEquals(Object o1, Object o2) {
boolean equals = safeEquals(o1, o2);
public static boolean trimEquals(Object o1, Object o2) {
boolean equals = equals(o1, o2);
if (!equals) {
if (o1 != null && o2 != null) {
if (o1 instanceof String && o2 instanceof String) {
Expand All @@ -294,6 +294,10 @@ public static boolean safeTrimEquals(Object o1, Object o2) {
return equals;
}

public static boolean trimNoEquals(Object o1, Object o2) {
return !trimEquals(o1, o2);
}

/**
* Return as hash code for the given object; typically the value of <code>
* {@link Object#hashCode()}</code>. If the object is an array, this method will delegate to any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.streampark.console.core.service.ApplicationBackUpService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.flink.packer.pipeline.PipelineStatus;

import org.apache.shiro.authz.annotation.RequiresPermissions;
Expand Down Expand Up @@ -80,6 +81,8 @@ public class ApplicationController {

@Autowired private AppBuildPipeService appBuildPipeService;

@Autowired private ResourceService resourceService;

@Operation(summary = "Get application")
@ApiAccess
@PostMapping("get")
Expand Down Expand Up @@ -397,7 +400,7 @@ public RestResponse checkjar(String jar) {
@PostMapping("upload")
@RequiresPermissions("app:create")
public RestResponse upload(MultipartFile file) throws Exception {
String uploadPath = applicationService.upload(file);
String uploadPath = resourceService.upload(file);
return RestResponse.success(uploadPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;

import javax.validation.Valid;

Expand All @@ -58,6 +59,12 @@ public RestResponse addResource(@Valid Resource resource) {
return RestResponse.success();
}

@Operation(summary = "check resource")
@PostMapping("check")
public RestResponse checkResource(@Valid Resource resource) {
return this.resourceService.checkResource(resource);
}

@Operation(summary = "List resources")
@PostMapping("page")
public RestResponse page(RestRequest restRequest, Resource resource) {
Expand Down Expand Up @@ -87,4 +94,12 @@ public RestResponse listResource(@RequestParam Long teamId) {
List<Resource> resourceList = resourceService.findByTeamId(teamId);
return RestResponse.success(resourceList);
}

@Operation(summary = "Upload the resource jar")
@PostMapping("upload")
@RequiresPermissions("resource:add")
public RestResponse upload(MultipartFile file) throws Exception {
String uploadPath = resourceService.upload(file);
return RestResponse.success(uploadPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.base.util.ObjectUtils;
import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.bean.Dependency;
import org.apache.streampark.console.core.enums.FlinkAppState;
Expand Down Expand Up @@ -252,14 +251,6 @@ public String getIngressTemplate() {
return ingressTemplate;
}

public void setIngressTemplate(String ingressTemplate) {
this.ingressTemplate = ingressTemplate;
}

public String getDefaultModeIngress() {
return defaultModeIngress;
}

public void setDefaultModeIngress(String defaultModeIngress) {
this.defaultModeIngress = defaultModeIngress;
}
Expand Down Expand Up @@ -357,11 +348,6 @@ public DevelopmentMode getDevelopmentMode() {
return DevelopmentMode.of(jobType);
}

@JsonIgnore
public void setDevelopmentMode(DevelopmentMode mode) {
this.jobType = mode.getValue();
}

@JsonIgnore
public FlinkAppState getFlinkAppStateEnum() {
return FlinkAppState.of(state);
Expand Down Expand Up @@ -509,75 +495,6 @@ public boolean isNeedRestartOnFailed() {
return false;
}

/**
* Parameter comparison, mainly to compare whether the parameters related to Flink runtime have
* changed
*/
public boolean eqJobParam(Application other) {
// 1) Resolve Order has it changed
// 2) flink Version has it changed
// 3) Execution Mode has it changed
// 4) Parallelism has it changed
// 5) Task Slots has it changed
// 6) Options has it changed
// 7) properties has it changed
// 8) Program Args has it changed
// 9) Flink Version has it changed

if (!ObjectUtils.safeEquals(this.getVersionId(), other.getVersionId())) {
return false;
}

if (!ObjectUtils.safeEquals(this.getResolveOrder(), other.getResolveOrder())
|| !ObjectUtils.safeEquals(this.getExecutionMode(), other.getExecutionMode())
|| !ObjectUtils.safeEquals(this.getK8sRestExposedType(), other.getK8sRestExposedType())) {
return false;
}

if (this.getOptions() != null) {
if (other.getOptions() != null) {
if (!this.getOptions().trim().equals(other.getOptions().trim())) {
Map<String, Object> optMap = this.getOptionMap();
Map<String, Object> otherMap = other.getOptionMap();
if (optMap.size() != otherMap.size()) {
return false;
}
for (Map.Entry<String, Object> entry : optMap.entrySet()) {
if (!entry.getValue().equals(otherMap.get(entry.getKey()))) {
return false;
}
}
}
} else {
return false;
}
} else if (other.getOptions() != null) {
return false;
}

if (this.getDynamicProperties() != null) {
if (other.getDynamicProperties() != null) {
if (!this.getDynamicProperties().trim().equals(other.getDynamicProperties().trim())) {
return false;
}
} else {
return false;
}
} else if (other.getDynamicProperties() != null) {
return false;
}

if (this.getArgs() != null) {
if (other.getArgs() != null) {
return this.getArgs().trim().equals(other.getArgs().trim());
} else {
return false;
}
} else {
return other.getArgs() == null;
}
}

@JsonIgnore
public StorageType getStorageType() {
return getStorageType(getExecutionMode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.streampark.console.core.entity;

import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.console.base.util.ObjectUtils;
import org.apache.streampark.console.core.bean.Dependency;
import org.apache.streampark.console.core.enums.ChangedType;

Expand All @@ -30,6 +29,7 @@

import java.util.Base64;
import java.util.Date;
import java.util.Objects;

@Data
@TableName("t_flink_sql")
Expand Down Expand Up @@ -91,8 +91,8 @@ public ChangedType checkChange(FlinkSql target) {
Dependency targetDependency = Dependency.toDependency(target.getDependency());
boolean depDifference = !thisDependency.eq(targetDependency);
// 3) determine if team resource has changed
boolean teamResDifference =
!ObjectUtils.safeEquals(this.teamResource, target.getTeamResource());
boolean teamResDifference = !Objects.equals(this.teamResource, target.getTeamResource());

if (sqlDifference && depDifference && teamResDifference) {
return ChangedType.ALL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
import org.springframework.web.multipart.MultipartFile;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -75,8 +74,6 @@ public interface ApplicationService extends IService<Application> {

Map<String, Serializable> dashboard(Long teamId);

String upload(MultipartFile file) throws Exception;

/** set the latest to Effective, it will really become the current effective */
void toEffective(Application application);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.streampark.console.core.service;

import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.entity.Resource;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
import org.springframework.web.multipart.MultipartFile;

import java.io.IOException;
import java.util.List;

public interface ResourceService extends IService<Resource> {
Expand Down Expand Up @@ -87,4 +90,10 @@ public interface ResourceService extends IService<Resource> {
* @param targetUserId target user id
*/
void changeOwnership(Long userId, Long targetUserId);

String upload(MultipartFile file) throws IOException;

RestResponse checkResource(Resource resource);

List<String> getConnectorId(Resource resource) throws Exception;
}
Loading
Loading