Skip to content

Commit

Permalink
[Improve] spark-job build pipeline improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Sep 29, 2024
1 parent 8905c78 commit 794482b
Show file tree
Hide file tree
Showing 29 changed files with 204 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public enum FlinkJobType {
*/
@Nonnull
public static FlinkJobType of(@Nullable Integer value) {
for (FlinkJobType flinkDevelopmentMode : values()) {
if (flinkDevelopmentMode.mode.equals(value)) {
return flinkDevelopmentMode;
for (FlinkJobType flinkJobType : values()) {
if (flinkJobType.mode.equals(value)) {
return flinkJobType;
}
}
return FlinkJobType.UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public enum SparkJobType {
*/
@Nonnull
public static SparkJobType valueOf(@Nullable Integer value) {
for (SparkJobType sparkDevelopmentMode : values()) {
if (sparkDevelopmentMode.mode.equals(value)) {
return sparkDevelopmentMode;
for (SparkJobType sparkJobType : values()) {
if (sparkJobType.mode.equals(value)) {
return sparkJobType;
}
}
return SparkJobType.UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
@Validated
@RestController
@RequestMapping("flink/pipe")
public class FlinkApplicationBuildPipelineController {
public class FlinkBuildPipelineController {

@Autowired
private AppBuildPipeService appBuildPipeService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
@Validated
@RestController
@RequestMapping("spark/pipe")
public class SparkApplicationBuildPipelineController {
public class SparkBuildPipelineController {

@Autowired
private SparkAppBuildPipeService appBuildPipeService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ public void setState(Integer state) {
}

public void setYarnQueueByHotParams() {
if (!(FlinkDeployMode.YARN_APPLICATION == this.getFlinkDeployMode()
|| FlinkDeployMode.YARN_PER_JOB == this.getFlinkDeployMode())) {
if (!(FlinkDeployMode.YARN_APPLICATION == this.getDeployModeEnum()
|| FlinkDeployMode.YARN_PER_JOB == this.getDeployModeEnum())) {
return;
}

Expand Down Expand Up @@ -341,7 +341,7 @@ public ReleaseStateEnum getReleaseState() {
}

@JsonIgnore
public FlinkJobType getDevelopmentMode() {
public FlinkJobType getJobTypeEnum() {
return FlinkJobType.of(jobType);
}

Expand All @@ -356,7 +356,7 @@ public FlinkK8sRestExposedType getK8sRestExposedTypeEnum() {
}

@JsonIgnore
public FlinkDeployMode getFlinkDeployMode() {
public FlinkDeployMode getDeployModeEnum() {
return FlinkDeployMode.of(deployMode);
}

Expand Down Expand Up @@ -400,7 +400,7 @@ public String getRemoteAppHome() {
/** Automatically identify remoteAppHome or localAppHome based on app FlinkDeployMode */
@JsonIgnore
public String getAppHome() {
switch (this.getFlinkDeployMode()) {
switch (this.getDeployModeEnum()) {
case KUBERNETES_NATIVE_APPLICATION:
case KUBERNETES_NATIVE_SESSION:
case YARN_PER_JOB:
Expand All @@ -412,7 +412,7 @@ public String getAppHome() {
return getRemoteAppHome();
default:
throw new UnsupportedOperationException(
"unsupported deployMode ".concat(getFlinkDeployMode().getName()));
"unsupported deployMode ".concat(getDeployModeEnum().getName()));
}
}

Expand Down Expand Up @@ -558,7 +558,7 @@ public void updateHotParams(FlinkApplication appParam) {
if (appParam != this) {
this.hotParams = null;
}
FlinkDeployMode deployModeEnum = appParam.getFlinkDeployMode();
FlinkDeployMode deployModeEnum = appParam.getDeployModeEnum();
Map<String, String> hotParams = new HashMap<>(0);
if (needFillYarnQueueLabel(deployModeEnum)) {
hotParams.putAll(YarnQueueLabelExpression.getQueueLabelMap(appParam.getYarnQueue()));
Expand Down Expand Up @@ -594,7 +594,7 @@ public int hashCode() {
}

public boolean isKubernetesModeJob() {
return FlinkDeployMode.isKubernetesMode(this.getFlinkDeployMode());
return FlinkDeployMode.isKubernetesMode(this.getDeployModeEnum());
}

public static class SFunc {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public FlinkApplicationBackUp(FlinkApplication application) {
}

private void renderPath(FlinkApplication application) {
switch (application.getFlinkDeployMode()) {
switch (application.getDeployModeEnum()) {
case KUBERNETES_NATIVE_APPLICATION:
case KUBERNETES_NATIVE_SESSION:
case YARN_PER_JOB:
Expand All @@ -80,7 +80,7 @@ private void renderPath(FlinkApplication application) {
break;
default:
throw new UnsupportedOperationException(
"unsupported deployMode ".concat(application.getFlinkDeployMode().getName()));
"unsupported deployMode ".concat(application.getDeployModeEnum().getName()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ public void setState(Integer state) {
}

public void resolveYarnQueue() {
if (!(SparkDeployMode.YARN_CLIENT == this.getSparkDeployMode()
|| SparkDeployMode.YARN_CLUSTER == this.getSparkDeployMode())) {
if (!(SparkDeployMode.YARN_CLIENT == this.getDeployModeEnum()
|| SparkDeployMode.YARN_CLUSTER == this.getDeployModeEnum())) {
return;
}
if (StringUtils.isBlank(this.yarnQueue)) {
Expand Down Expand Up @@ -312,7 +312,7 @@ public ReleaseStateEnum getReleaseState() {
}

@JsonIgnore
public SparkJobType getDevelopmentMode() {
public SparkJobType getJobTypeEnum() {
return SparkJobType.valueOf(jobType);
}

Expand All @@ -322,7 +322,7 @@ public SparkAppStateEnum getStateEnum() {
}

@JsonIgnore
public SparkDeployMode getSparkDeployMode() {
public SparkDeployMode getDeployModeEnum() {
return SparkDeployMode.of(deployMode);
}

Expand Down Expand Up @@ -351,7 +351,7 @@ public String getRemoteAppHome() {
/** Automatically identify remoteAppHome or localAppHome based on app SparkDeployMode */
@JsonIgnore
public String getAppHome() {
switch (this.getSparkDeployMode()) {
switch (this.getDeployModeEnum()) {
case REMOTE:
case LOCAL:
return getLocalAppHome();
Expand All @@ -360,7 +360,7 @@ public String getAppHome() {
return getRemoteAppHome();
default:
throw new UnsupportedOperationException(
"unsupported deployMode ".concat(getSparkDeployMode().getName()));
"unsupported deployMode ".concat(getDeployModeEnum().getName()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public SparkApplicationBackUp(SparkApplication application) {
}

private void renderPath(SparkApplication application) {
switch (application.getSparkDeployMode()) {
switch (application.getDeployModeEnum()) {
case LOCAL:
this.path = String.format(
"%s/%d/%d",
Expand All @@ -78,7 +78,7 @@ private void renderPath(SparkApplication application) {
break;
default:
throw new UnsupportedOperationException(
"unsupported deployMode ".concat(application.getSparkDeployMode().getName()));
"unsupported deployMode ".concat(application.getDeployModeEnum().getName()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -455,13 +455,13 @@ private BuildPipeline createPipelineInstance(@Nonnull FlinkApplication app) {
}
}

FlinkDeployMode deployModeEnum = app.getFlinkDeployMode();
FlinkDeployMode deployModeEnum = app.getDeployModeEnum();
String mainClass = Constants.STREAMPARK_FLINKSQL_CLIENT_CLASS;
switch (deployModeEnum) {
case YARN_APPLICATION:
String yarnProvidedPath = app.getAppLib();
String localWorkspace = app.getLocalAppHome().concat("/lib");
if (FlinkJobType.CUSTOM_CODE == app.getDevelopmentMode()
if (FlinkJobType.CUSTOM_CODE == app.getJobTypeEnum()
&& ApplicationType.APACHE_FLINK == app.getApplicationType()) {
yarnProvidedPath = app.getAppHome();
localWorkspace = app.getLocalAppHome();
Expand Down Expand Up @@ -490,7 +490,7 @@ private BuildPipeline createPipelineInstance(@Nonnull FlinkApplication app) {
return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
default:
throw new UnsupportedOperationException(
"Unsupported Building Application for DeployMode: " + app.getFlinkDeployMode());
"Unsupported Building Application for DeployMode: " + app.getDeployModeEnum());
}
}

Expand All @@ -505,7 +505,7 @@ private FlinkYarnApplicationBuildRequest buildFlinkYarnApplicationBuildRequest(
mainClass,
localWorkspace,
yarnProvidedPath,
app.getDevelopmentMode(),
app.getJobTypeEnum(),
getMergedDependencyInfo(app));
}

Expand All @@ -521,8 +521,8 @@ private FlinkK8sApplicationBuildRequest buildFlinkK8sApplicationBuildRequest(
app.getLocalAppHome(),
mainClass,
flinkUserJar,
app.getFlinkDeployMode(),
app.getDevelopmentMode(),
app.getDeployModeEnum(),
app.getJobTypeEnum(),
flinkEnv.getFlinkVersion(),
getMergedDependencyInfo(app),
app.getJobName(),
Expand All @@ -549,8 +549,8 @@ private FlinkK8sSessionBuildRequest buildFlinkK8sSessionBuildRequest(
app.getLocalAppHome(),
mainClass,
flinkUserJar,
app.getFlinkDeployMode(),
app.getDevelopmentMode(),
app.getDeployModeEnum(),
app.getJobTypeEnum(),
flinkEnv.getFlinkVersion(),
getMergedDependencyInfo(app),
app.getClusterId(),
Expand All @@ -569,15 +569,15 @@ private FlinkRemotePerJobBuildRequest buildFlinkRemotePerJobBuildRequest(
mainClass,
flinkUserJar,
app.isCustomCodeJob(),
app.getFlinkDeployMode(),
app.getDevelopmentMode(),
app.getDeployModeEnum(),
app.getJobTypeEnum(),
flinkEnv.getFlinkVersion(),
getMergedDependencyInfo(app));
}

/** copy from {@link FlinkApplicationActionService#start(FlinkApplication, boolean)} */
private String retrieveFlinkUserJar(FlinkEnv flinkEnv, FlinkApplication app) {
switch (app.getDevelopmentMode()) {
switch (app.getJobTypeEnum()) {
case CUSTOM_CODE:
switch (app.getApplicationType()) {
case STREAMPARK_FLINK:
Expand All @@ -594,14 +594,14 @@ private String retrieveFlinkUserJar(FlinkEnv flinkEnv, FlinkApplication app) {
return String.format("%s/%s", app.getAppHome(), app.getJar());
case FLINK_SQL:
String sqlDistJar = ServiceHelper.getFlinkSqlClientJar(flinkEnv);
if (app.getFlinkDeployMode() == FlinkDeployMode.YARN_APPLICATION) {
if (app.getDeployModeEnum() == FlinkDeployMode.YARN_APPLICATION) {
String clientPath = Workspace.remote().APP_CLIENT();
return String.format("%s/%s", clientPath, sqlDistJar);
}
return Workspace.local().APP_CLIENT().concat("/").concat(sqlDistJar);
default:
throw new UnsupportedOperationException(
"[StreamPark] unsupported JobType: " + app.getDevelopmentMode());
"[StreamPark] unsupported JobType: " + app.getJobTypeEnum());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public void cancel(FlinkApplication appParam) throws Exception {

Map<String, Object> properties = new HashMap<>();

if (FlinkDeployMode.isRemoteMode(application.getFlinkDeployMode())) {
if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfNull(
cluster,
Expand Down Expand Up @@ -410,12 +410,12 @@ public void start(FlinkApplication appParam, boolean auto) throws Exception {
ApiAlertException.throwIfTrue(
!application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly.");

if (FlinkDeployMode.isRemoteMode(application.getFlinkDeployMode())
|| FlinkDeployMode.isSessionMode(application.getFlinkDeployMode())) {
if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum())
|| FlinkDeployMode.isSessionMode(application.getDeployModeEnum())) {
checkBeforeStart(application);
}

if (FlinkDeployMode.isYarnMode(application.getFlinkDeployMode())) {
if (FlinkDeployMode.isYarnMode(application.getDeployModeEnum())) {
ApiAlertException.throwIfTrue(
!applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(),
"[StreamPark] The same task name is already running in the yarn queue");
Expand Down Expand Up @@ -457,7 +457,7 @@ public void start(FlinkApplication appParam, boolean auto) throws Exception {
String appConf = userJarAndAppConf.t2;

BuildResult buildResult = buildPipeline.getBuildResult();
if (FlinkDeployMode.YARN_APPLICATION == application.getFlinkDeployMode()) {
if (FlinkDeployMode.YARN_APPLICATION == application.getDeployModeEnum()) {
buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
}

Expand Down Expand Up @@ -648,7 +648,7 @@ private void starting(FlinkApplication application) {
}

private Tuple2<String, String> getUserJarAndAppConf(FlinkEnv flinkEnv, FlinkApplication application) {
FlinkDeployMode deployModeEnum = application.getFlinkDeployMode();
FlinkDeployMode deployModeEnum = application.getDeployModeEnum();
FlinkApplicationConfig applicationConfig = configService.getEffective(application.getId());

ApiAlertException.throwIfNull(
Expand All @@ -657,7 +657,7 @@ private Tuple2<String, String> getUserJarAndAppConf(FlinkEnv flinkEnv, FlinkAppl
String flinkUserJar = null;
String appConf = null;

switch (application.getDevelopmentMode()) {
switch (application.getJobTypeEnum()) {
case FLINK_SQL:
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
AssertUtils.notNull(flinkSql);
Expand Down Expand Up @@ -751,7 +751,7 @@ private Tuple2<String, String> getUserJarAndAppConf(FlinkEnv flinkEnv, FlinkAppl

private Map<String, Object> getProperties(FlinkApplication application, String runtimeProperties) {
Map<String, Object> properties = new HashMap<>(application.getOptionMap());
if (FlinkDeployMode.isRemoteMode(application.getFlinkDeployMode())) {
if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfNull(
cluster,
Expand All @@ -762,8 +762,8 @@ private Map<String, Object> getProperties(FlinkApplication application, String r
URI activeAddress = cluster.getRemoteURI();
properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
properties.put(RestOptions.PORT.key(), activeAddress.getPort());
} else if (FlinkDeployMode.isYarnMode(application.getFlinkDeployMode())) {
if (FlinkDeployMode.YARN_SESSION == application.getFlinkDeployMode()) {
} else if (FlinkDeployMode.isYarnMode(application.getDeployModeEnum())) {
if (FlinkDeployMode.YARN_SESSION == application.getDeployModeEnum()) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfNull(
cluster,
Expand All @@ -780,7 +780,7 @@ private Map<String, Object> getProperties(FlinkApplication application, String r
Optional.ofNullable(yarnLabelExpr)
.ifPresent(yLabel -> properties.put(ConfigKeys.KEY_YARN_APP_NODE_LABEL(), yLabel));
}
} else if (FlinkDeployMode.isKubernetesMode(application.getFlinkDeployMode())) {
} else if (FlinkDeployMode.isKubernetesMode(application.getDeployModeEnum())) {
properties.put(ConfigKeys.KEY_K8S_IMAGE_PULL_POLICY(), "Always");
}

Expand Down Expand Up @@ -818,7 +818,7 @@ private void doAbort(Long id) {
FlinkAppHttpWatcher.unWatching(application.getId());
}
// kill application
if (FlinkDeployMode.isYarnMode(application.getFlinkDeployMode())) {
if (FlinkDeployMode.isYarnMode(application.getDeployModeEnum())) {
try {
List<ApplicationReport> applications = applicationInfoService
.getYarnAppReport(application.getJobName());
Expand Down Expand Up @@ -866,7 +866,7 @@ private Tuple3<String, String, FlinkK8sRestExposedType> getNamespaceClusterId(
String clusterId = null;
String k8sNamespace = null;
FlinkK8sRestExposedType exposedType = null;
switch (application.getFlinkDeployMode()) {
switch (application.getDeployModeEnum()) {
case YARN_APPLICATION:
case YARN_PER_JOB:
case YARN_SESSION:
Expand Down
Loading

0 comments on commit 794482b

Please sign in to comment.