Skip to content

Commit

Permalink
[Improve] Adjust to fit the Spark page (#4042)
Browse files Browse the repository at this point in the history
* feat: spark multiversion support

* feat: change custom code to spark jar

* feat: adjust to frontend
  • Loading branch information
lenoxzhao authored Sep 7, 2024
1 parent bb604e4 commit a94ed3a
Show file tree
Hide file tree
Showing 12 changed files with 44 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public enum SparkDevelopmentMode {
/** Unknown type replace null */
UNKNOWN("Unknown", -1),

/** custom code */
CUSTOM_CODE("Custom Code", 1),
/** Spark Jar */
SPARK_JAR("Spark Jar", 1),

/** Spark SQL */
SPARK_SQL("Spark SQL", 2),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SparkVersion(val sparkHome: String) extends Serializable with Logger {
val (version, scalaVersion) = {
var sparkVersion: String = null
var scalaVersion: String = null
val cmd = List(s"$sparkHome/bin/spark-submit --version")
val cmd = List(s"export SPARK_HOME=$sparkHome&&$sparkHome/bin/spark-submit --version")
val buffer = new mutable.StringBuilder

CommandUtils.execute(
Expand Down Expand Up @@ -91,7 +91,7 @@ class SparkVersion(val sparkHome: String) extends Serializable with Logger {

def checkVersion(throwException: Boolean = true): Boolean = {
version.split("\\.").map(_.trim.toInt) match {
case Array(3, v, _) if v >= 1 && v <= 3 => true
case Array(v, _, _) if v == 2 || v == 3 => true
case _ =>
if (throwException) {
throw new UnsupportedOperationException(s"Unsupported spark version: $version")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ insert into `t_menu` values (110118, 110100, 'app sql delete', null, null, 'sql:
insert into `t_menu` values (110301, 110300, 'cluster add', '/flink/add_cluster', 'flink/cluster/Add', 'cluster:create', '', '0', 0, null, now(), now());
insert into `t_menu` values (110302, 110300, 'cluster edit', '/flink/edit_cluster', 'flink/cluster/Edit', 'cluster:update', '', '0', 0, null, now(), now());

insert into `t_menu` values (120100, 120000, 'spark.application', '/spark/app', 'spark/app/index', null, null, '0', 1, 2, now(), now());
insert into `t_menu` values (120200, 120000, 'spark.sparkHome', '/spark/home', 'spark/home/index', null, null, '0', 1, 3, now(), now());
insert into `t_menu` values (120300, 120000, 'spark.createApplication', '/spark/app/create', 'spark/app/create', 'app:create', '', '0', 0, null, now(), now());
insert into `t_menu` values (120400, 120000, 'spark.updateApplication', '/spark/app/edit', 'spark/app/edit', 'app:update', '', '0', 0, null, now(), now());
insert into `t_menu` values (120500, 120000, 'spark.applicationDetail', '/spark/app/detail', 'spark/app/detail', 'app:detail', '', '0', 0, null, now(), now());

insert into `t_menu` values (130100, 130000, 'resource.project', '/resource/project', 'resource/project/View', null, 'github', '0', 1, 3, now(), now());
insert into `t_menu` values (130200, 130000, 'resource.variable', '/resource/variable', 'resource/variable/View', null, null, '0', 1, 4, now(), now());
insert into `t_menu` values (130300, 130000, 'resource.upload', '/resource/upload', 'resource/upload/View', null, null, '0', 1, 3, now(), now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class SparkApplication extends BaseEntity {

private Long teamId;

/** 1) custom code 2) spark SQL */
/** 1) spark jar 2) spark SQL 3) pyspark*/
private Integer jobType;

/** 1) Apache Spark 2) StreamPark Spark */
Expand Down Expand Up @@ -386,40 +386,41 @@ public Map<String, String> getOptionMap() {
}

@JsonIgnore
public boolean isSparkSqlJob() {
return SparkDevelopmentMode.SPARK_SQL.getMode().equals(this.getJobType());
public boolean isSparkOnYarnJob() {
return SparkExecutionMode.YARN_CLUSTER.getMode() == (this.getExecutionMode())
|| SparkExecutionMode.YARN_CLIENT.getMode() == (this.getExecutionMode());
}

@JsonIgnore
public boolean isCustomCodeJob() {
return SparkDevelopmentMode.CUSTOM_CODE.getMode().equals(this.getJobType());
public boolean isSparkSqlJob() {
return SparkDevelopmentMode.SPARK_SQL.getMode().equals(this.getJobType());
}

@JsonIgnore
public boolean isCustomCodeOrSparkSqlJob() {
return isSparkSqlJob() || isCustomCodeJob();
public boolean isSparkJarJob() {
return SparkDevelopmentMode.SPARK_JAR.getMode().equals(this.getJobType());
}

@JsonIgnore
public boolean isCustomCodeOrPySparkJob() {
return SparkDevelopmentMode.CUSTOM_CODE.getMode().equals(this.getJobType())
public boolean isSparkJarOrPySparkJob() {
return SparkDevelopmentMode.SPARK_JAR.getMode().equals(this.getJobType())
|| SparkDevelopmentMode.PYSPARK.getMode().equals(this.getJobType());
}

@JsonIgnore
public boolean isUploadJob() {
return isCustomCodeOrPySparkJob()
return isSparkJarOrPySparkJob()
&& ResourceFromEnum.UPLOAD.getValue().equals(this.getResourceFrom());
}

@JsonIgnore
public boolean isCICDJob() {
return isCustomCodeOrPySparkJob()
return isSparkJarOrPySparkJob()
&& ResourceFromEnum.CICD.getValue().equals(this.getResourceFrom());
}

public boolean isStreamParkJob() {
return this.getAppType() == ApplicationType.STREAMPARK_FLINK.getType();
return this.getAppType() == ApplicationType.STREAMPARK_SPARK.getType();
}

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ private Tuple2<String, String> getUserJarAndAppConf(
sparkUserJar = resource.getFilePath();
break;

case CUSTOM_CODE:
case SPARK_JAR:
if (application.isUploadJob()) {
appConf = applicationConfig == null
? null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.streampark.console.core.service.application.impl;

import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.constants.Constants;
import org.apache.streampark.common.enums.SparkExecutionMode;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
Expand Down Expand Up @@ -267,8 +268,12 @@ public boolean create(SparkApplication appParam) {
ApiAlertException.throwIfFalse(
success,
String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(), appParam.getTeamId()));
appParam.resolveYarnQueue();

if (appParam.isSparkOnYarnJob()) {
appParam.resolveYarnQueue();
if (appParam.isSparkSqlJob()) {
appParam.setMainClass(Constants.STREAMPARK_SPARKSQL_CLIENT_CLASS);
}
}
if (appParam.isUploadJob()) {
String jarPath = String.format(
"%s/%d/%s", Workspace.local().APP_UPLOADS(), appParam.getTeamId(), appParam.getJar());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.constants.Constants;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.SparkDevelopmentMode;
import org.apache.streampark.common.enums.SparkExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.AssertUtils;
Expand Down Expand Up @@ -201,8 +200,8 @@ public void onStart(PipelineSnapshot snapshot) {
// 2) some preparatory work
String appUploads = app.getWorkspace().APP_UPLOADS();

if (app.isCustomCodeOrPySparkJob()) {
// customCode upload jar to appHome...
if (app.isSparkJarOrPySparkJob()) {
// spark jar and pyspark upload resource to appHome...
String appHome = app.getAppHome();
FsOperator fsOperator = app.getFsOperator();
fsOperator.delete(appHome);
Expand Down Expand Up @@ -286,7 +285,7 @@ public void onFinish(PipelineSnapshot snapshot, BuildResult result) {
// If the current task is not running, or the task has just been added, directly
// set
// the candidate version to the official version
if (app.isCustomCodeOrSparkSqlJob()) {
if (app.isSparkOnYarnJob()) {
applicationManageService.toEffective(app);
} else {
if (app.isStreamParkJob()) {
Expand Down Expand Up @@ -378,8 +377,7 @@ private BuildPipeline createPipelineInstance(@Nonnull SparkApplication app) {
case YARN_CLIENT:
String yarnProvidedPath = app.getAppLib();
String localWorkspace = app.getLocalAppHome().concat("/lib");
if (SparkDevelopmentMode.CUSTOM_CODE == app.getDevelopmentMode()
&& ApplicationType.APACHE_SPARK == app.getApplicationType()) {
if (ApplicationType.APACHE_SPARK == app.getApplicationType()) {
yarnProvidedPath = app.getAppHome();
localWorkspace = app.getLocalAppHome();
}
Expand All @@ -400,7 +398,7 @@ private BuildPipeline createPipelineInstance(@Nonnull SparkApplication app) {

private String retrieveSparkUserJar(SparkEnv sparkEnv, SparkApplication app) {
switch (app.getDevelopmentMode()) {
case CUSTOM_CODE:
case SPARK_JAR:
switch (app.getApplicationType()) {
case STREAMPARK_SPARK:
return String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public Boolean removeById(Long id) throws InternalException {
@Override
public void backup(SparkApplication appParam, SparkSql sparkSqlParam) {
// basic configuration file backup
String appHome = (appParam.isCustomCodeJob() && appParam.isCICDJob())
String appHome = (appParam.isCICDJob())
? appParam.getDistHome()
: appParam.getAppHome();
FsOperator fsOperator = appParam.getFsOperator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
<select id="selectPage" resultType="org.apache.streampark.console.core.entity.SparkApplication" parameterType="org.apache.streampark.console.core.entity.SparkApplication">
select
t.*,
p.name as projectName,
u.username,
case
when trim(u.nick_name) = ''
Expand All @@ -104,37 +103,21 @@
<if test="app.jobType != null and app.jobType != ''">
and t.job_type = #{app.jobType}
</if>
<if test="app.jobTypeArray != null and app.jobTypeArray.length>0">
and t.job_type in
<foreach item="item" index="index" collection="app.jobTypeArray" open="(" close=")" separator=",">
#{item}
</foreach>
</if>
<if test="app.executionMode != null and app.executionMode != ''">
and t.execution_mode = #{app.executionMode}
</if>
<if test="app.appName != null and app.appName != ''">
and t.app_name like concat('%', '${app.appName}', '%')
</if>
<if test="app.projectName != null and app.projectName != ''">
and p.name like concat('%', '${app.projectName}', '%')
</if>
<if test="app.appId != null and app.appId != ''">
and t.app_id = #{app.appId}
</if>
<if test="app.state != null and app.state != ''">
and t.state = #{app.state}
</if>

<if test="app.userId != null and app.userId != ''">
and t.user_id = #{app.userId}
</if>
<if test="app.stateArray != null and app.stateArray.length>0">
and t.state in
<foreach item="item" index="index" collection="app.stateArray" open="(" close=")" separator=",">
#{item}
</foreach>
</if>
<if test="app.tags != null and app.tags != ''">
and t.tags like concat('%', '${app.tags}', '%')
</if>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ case class SubmitRequest(

lazy val appMain: String = this.developmentMode match {
case SparkDevelopmentMode.SPARK_SQL => Constants.STREAMPARK_SPARKSQL_CLIENT_CLASS
case SparkDevelopmentMode.CUSTOM_CODE | SparkDevelopmentMode.PYSPARK => mainClass
case SparkDevelopmentMode.SPARK_JAR | SparkDevelopmentMode.PYSPARK => mainClass
case SparkDevelopmentMode.UNKNOWN => throw new IllegalArgumentException("Unknown deployment Mode")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,11 @@ object SparkShimsProxy extends Logger {
logInfo(s"Add verify sql lib,spark version: $sparkVersion")
VERIFY_SQL_CLASS_LOADER_CACHE.getOrElseUpdate(
s"${sparkVersion.fullVersion}", {
val getSparkTable: File => Boolean = _.getName.startsWith("spark-table")
// 1) spark/lib/spark-table*
val libTableURL =
getSparkHomeLib(sparkVersion.sparkHome, "lib", getSparkTable)
val libUrl = getSparkHomeLib(sparkVersion.sparkHome, "jars", f => !f.getName.startsWith("log4j") && !f.getName.startsWith("slf4j"))
val shimsUrls = ListBuffer[URL](libUrl: _*)

// 2) After version 1.15 need add spark/opt/spark-table*
val optTableURL =
getSparkHomeLib(sparkVersion.sparkHome, "opt", getSparkTable)
val shimsUrls = ListBuffer[URL](libTableURL ++ optTableURL: _*)
// TODO If there are compatibility issues with different versions

// 3) add only streampark shims jar
addShimsUrls(
sparkVersion,
file => {
Expand Down Expand Up @@ -135,7 +129,7 @@ object SparkShimsProxy extends Logger {
if (INCLUDE_PATTERN.matcher(jarName).matches()) {
addShimUrl(jar)
logInfo(s"Include jar lib: $jarName")
} else if (jarName.matches(s"^streampark-.*_$scalaVersion.*$$")) {
} else if (jarName.matches(s"^streampark-(?!flink).*_$scalaVersion.*$$")) {
addShimUrl(jar)
logInfo(s"Include streampark lib: $jarName")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,9 @@ trait Spark extends Logger {
throw new IllegalArgumentException(
"[StreamPark] Usage: config file error,must be [properties|yaml|conf]")
}

sparkConf.setAll(localConf)
localConf.foreach(arg => sparkConf.set(arg._1, arg._2))
}

sparkConf.setAll(userArgs)
userArgs.foreach(arg => sparkConf.set(arg._1, arg._2))

val appMain = sparkConf.get(KEY_SPARK_MAIN_CLASS, "org.apache.streampark.spark.cli.SqlClient")
if (appMain == null) {
Expand Down

0 comments on commit a94ed3a

Please sign in to comment.