From 7e32072c823a86d5ad8f1501d62a6f2df4c648a9 Mon Sep 17 00:00:00 2001 From: caicancai <2356672992@qq.com> Date: Fri, 1 Sep 2023 18:39:08 +0800 Subject: [PATCH] Code optimization --- .../impl/KubernetesNativeApplicationClient.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala index 4aeb006d88..4300dacbb3 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala @@ -64,10 +64,8 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait { flinkConfig.safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, buildResult.flinkImageTag) // retrieve k8s cluster and submit flink job on application mode - var clusterDescriptor: KubernetesClusterDescriptor = null var clusterClient: ClusterClient[String] = null - val spec: FlinkDeploymentDef = convertFlinkDeploymentDef(submitRequest, flinkConfig) try { // val (descriptor, clusterSpecification) = getK8sClusterDescriptorAndSpecification(flinkConfig) // clusterDescriptor = descriptor @@ -76,6 +74,7 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait { // clusterClient = clusterDescriptor // .deployApplicationCluster(clusterSpecification, applicationConfig) // .getClusterClient + val spec: FlinkDeploymentDef = convertFlinkDeploymentDef(submitRequest, flinkConfig) FlinkK8sOperator.deployApplicationJob(submitRequest.id, spec).runIO val clusterId = clusterClient.getClusterId val result = SubmitResponse( @@ -91,7 +90,7 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait { logError(s"submit flink job fail in ${submitRequest.executionMode} mode") throw e } finally { - Utils.close(clusterDescriptor, clusterClient) + Utils.close(clusterClient) } } @@ -121,8 +120,10 @@ object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait { flinkVersion = Option(submitRequest.flinkVersion.majorVersion) .map(_.replace(".", "_")) .map("V" + _) - .flatMap(v => FlinkVersion.values().find(_.name() == v)) - .getOrElse(null), + .flatMap(v => FlinkVersion.values().find(_.name() == v)) match { + case Some(version) => version + case None => throw new IllegalArgumentException("Flink version not found") + }, jobManager = JobManagerDef( cpu = 1, memory = flinkConfig.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).toString),