Skip to content

Commit

Permalink
optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
caicancai committed Aug 30, 2023
1 parent e11491e commit 32076c8
Showing 1 changed file with 18 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.streampark.flink.client.impl

import org.apache.streampark.common.enums.ExecutionMode
import org.apache.streampark.common.util.Utils
import org.apache.streampark.common.zio.ZIOExt.IOOps
import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, JobDef, JobManagerDef, TaskManagerDef}
Expand All @@ -27,7 +28,6 @@ import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse

import com.google.common.collect.Lists
import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{Configuration, DeploymentOptions, JobManagerOptions, PipelineOptions, TaskManagerOptions}
import org.apache.flink.kubernetes.KubernetesClusterDescriptor
Expand Down Expand Up @@ -67,17 +67,16 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
var clusterDescriptor: KubernetesClusterDescriptor = null
var clusterClient: ClusterClient[String] = null

val spec = convertFlinkDeploymentDef(submitRequest, flinkConfig)

val spec: FlinkDeploymentDef = convertFlinkDeploymentDef(submitRequest, flinkConfig)
try {
val (descriptor, clusterSpecification) = getK8sClusterDescriptorAndSpecification(flinkConfig)
clusterDescriptor = descriptor

val applicationConfig = ApplicationConfiguration.fromConfiguration(flinkConfig)
// val (descriptor, clusterSpecification) = getK8sClusterDescriptorAndSpecification(flinkConfig)
// clusterDescriptor = descriptor
//
// val applicationConfig = ApplicationConfiguration.fromConfiguration(flinkConfig)
// clusterClient = clusterDescriptor
// .deployApplicationCluster(clusterSpecification, applicationConfig)
// .getClusterClient
FlinkK8sOperator.deployApplicationJob(submitRequest.id, spec)
FlinkK8sOperator.deployApplicationJob(submitRequest.id, spec).runIO
val clusterId = clusterClient.getClusterId
val result = SubmitResponse(
clusterId,
Expand Down Expand Up @@ -118,8 +117,12 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
val spec = FlinkDeploymentDef(
name = submitRequest.appName,
namespace = submitRequest.k8sSubmitParam.kubernetesNamespace,
image = "flink:" + submitRequest.flinkVersion.majorVersion,
flinkVersion = converFlinkVersion(submitRequest.flinkVersion.majorVersion).getOrElse(null),
image = KubernetesConfigOptions.CONTAINER_IMAGE.defaultValue(),
flinkVersion = Option(submitRequest.flinkVersion.majorVersion)
.map(_.replace(".", "_"))
.map("V" + _)
.flatMap(v => FlinkVersion.values().find(_.name() == v))
.getOrElse(null),
jobManager = JobManagerDef(
cpu = 1,
memory = flinkConfig.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).toString),
Expand All @@ -130,21 +133,14 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
JobDef(
jarURI =
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].dockerInnerMainJarPath,
parallelism = submitRequest.properties.get("parallelism.default").asInstanceOf[Int]
parallelism = 1
)),
extJarPaths = Array(submitRequest.userJarFile.getAbsolutePath)
extJarPaths = submitRequest.userJarFile match {
case null => Array.empty[String]
case file => Array(file.getAbsolutePath)
}
)
spec
}

private[this] def converFlinkVersion(version: String): Option[FlinkVersion] = {
version match {
case "1.17" => Some(FlinkVersion.V1_17)
case "1.16" => Some(FlinkVersion.V1_16)
case "1.15" => Some(FlinkVersion.V1_15)
case "1.14" => Some(FlinkVersion.V1_14)
case "1.13" => Some(FlinkVersion.V1_13)
case _ => None
}
}
}

0 comments on commit 32076c8

Please sign in to comment.