Skip to content

Commit

Permalink
Code optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
caicancai committed Sep 1, 2023
1 parent 09f1e2d commit 7e32072
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,8 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
flinkConfig.safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, buildResult.flinkImageTag)

// retrieve k8s cluster and submit flink job on application mode
var clusterDescriptor: KubernetesClusterDescriptor = null
var clusterClient: ClusterClient[String] = null

val spec: FlinkDeploymentDef = convertFlinkDeploymentDef(submitRequest, flinkConfig)
try {
// val (descriptor, clusterSpecification) = getK8sClusterDescriptorAndSpecification(flinkConfig)
// clusterDescriptor = descriptor
Expand All @@ -76,6 +74,7 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
// clusterClient = clusterDescriptor
// .deployApplicationCluster(clusterSpecification, applicationConfig)
// .getClusterClient
val spec: FlinkDeploymentDef = convertFlinkDeploymentDef(submitRequest, flinkConfig)
FlinkK8sOperator.deployApplicationJob(submitRequest.id, spec).runIO
val clusterId = clusterClient.getClusterId
val result = SubmitResponse(
Expand All @@ -91,7 +90,7 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
logError(s"submit flink job fail in ${submitRequest.executionMode} mode")
throw e
} finally {
Utils.close(clusterDescriptor, clusterClient)
Utils.close(clusterClient)
}
}

Expand Down Expand Up @@ -121,8 +120,10 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
flinkVersion = Option(submitRequest.flinkVersion.majorVersion)
.map(_.replace(".", "_"))
.map("V" + _)
.flatMap(v => FlinkVersion.values().find(_.name() == v))
.getOrElse(null),
.flatMap(v => FlinkVersion.values().find(_.name() == v)) match {
case Some(version) => version
case None => throw new IllegalArgumentException("Flink version not found")
},
jobManager = JobManagerDef(
cpu = 1,
memory = flinkConfig.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).toString),
Expand Down

0 comments on commit 7e32072

Please sign in to comment.