From 8ff30e3de6c233adc5601839aaa73c4d39bc78cd Mon Sep 17 00:00:00 2001
From: caicancai <77189278+caicancai@users.noreply.github.com>
Date: Thu, 7 Sep 2023 18:39:24 +0800
Subject: [PATCH] Refactor the lifecycle control of flink application mode
-dosubmit (#2994)
* remove scala2.11
* [Feature] [Flink-K8s-V2] Refactor the lifecycle control of Flink application-mode_dosubmit
* add v2 denpendcy
* mvn spotless:apply
* throw exception
* optimization
* Code optimization
* add comment
* add comment
* add zio denpency
* resolve dependency conflicts
* add KubernetesNativeApplicationClient_V2
* Parameter optimization and add license
* Parameter optimization and add license
* add flinkconfig param
* Rename to KubernetesNativeApplicationV2
---
.../streampark-flink-client-core/pom.xml | 27 ++++
.../KubernetesNativeApplicationClientV2.scala | 121 ++++++++++++++++++
2 files changed, 148 insertions(+)
create mode 100644 streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
index 9e942fc1f8..506d428dfb 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
@@ -51,6 +51,18 @@
provided
+
+ org.apache.streampark
+ streampark-flink-kubernetes-core_${scala.binary.version}
+ ${project.version}
+
+
+ jackson-dataformat-yaml
+ com.fasterxml.jackson.dataformat
+
+
+
+
org.apache.flink
flink-yarn${scala.binary.flink.version}
@@ -94,6 +106,21 @@
json4s-jackson_${scala.binary.version}
+
+
+ dev.zio
+ zio-logging_${scala.binary.version}
+
+
+
+ dev.zio
+ zio-streams_${scala.binary.version}
+
+
+
+ dev.zio
+ zio-concurrent_${scala.binary.version}
+
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
new file mode 100644
index 0000000000..f43e88f0b7
--- /dev/null
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.client.impl
+
+import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode}
+import org.apache.streampark.common.zio.ZIOExt.IOOps
+import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
+import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, JobDef, JobManagerDef, TaskManagerDef}
+import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
+import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.configuration.{Configuration, DeploymentOptions, ExecutionOptions, JobManagerOptions, PipelineOptions, TaskManagerOptions}
+import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
+
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+
+object KubernetesNativeApplicationClientV2 extends KubernetesNativeClientTrait {
+ @throws[Exception]
+ override def doSubmit(
+ submitRequest: SubmitRequest,
+ flinkConfig: Configuration): SubmitResponse = {
+
+ // require parameters
+ require(
+ StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId),
+ s"[flink-submit] submit flink job failed, clusterId is null, mode=${flinkConfig.get(DeploymentOptions.TARGET)}"
+ )
+
+ // check the last building result
+ submitRequest.checkBuildResult()
+
+ try {
+ val spec: FlinkDeploymentDef = convertFlinkDeploymentDef(submitRequest, flinkConfig)
+ FlinkK8sOperator.deployApplicationJob(submitRequest.id, spec).runIO
+ val result = SubmitResponse(null, flinkConfig.toMap, submitRequest.jobId, null)
+ logInfo(
+ s"[flink-submit] flink job has been submitted. ${flinkConfIdentifierInfo(flinkConfig)}")
+ result
+ } catch {
+ case e: Exception =>
+ logError(s"submit flink job fail in ${submitRequest.executionMode} mode")
+ throw e
+ } finally {}
+ }
+
+ override def doCancel(
+ cancelRequest: CancelRequest,
+ flinkConfig: Configuration): CancelResponse = {
+ flinkConfig.safeSet(
+ DeploymentOptions.TARGET,
+ ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
+ super.doCancel(cancelRequest, flinkConfig)
+ }
+
+ override def doTriggerSavepoint(
+ request: TriggerSavepointRequest,
+ flinkConf: Configuration): SavepointResponse = {
+ flinkConf.safeSet(DeploymentOptions.TARGET, ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
+ super.doTriggerSavepoint(request, flinkConf)
+ }
+
+ private[this] def convertFlinkDeploymentDef(
+ submitRequest: SubmitRequest,
+ flinkConfig: Configuration): FlinkDeploymentDef = {
+ val spec = FlinkDeploymentDef(
+ name = submitRequest.appName,
+ namespace = submitRequest.k8sSubmitParam.kubernetesNamespace,
+ image = submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].flinkImageTag,
+ flinkVersion = Option(submitRequest.flinkVersion.majorVersion)
+ .map(_.replace(".", "_"))
+ .map("V" + _)
+ .flatMap(v => FlinkVersion.values().find(_.name() == v)) match {
+ case Some(version) => version
+ case None => throw new IllegalArgumentException("Flink version not found")
+ },
+ jobManager = JobManagerDef(
+ cpu = 1,
+ memory = flinkConfig.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).toString),
+ taskManager = TaskManagerDef(
+ cpu = 1,
+ memory = flinkConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).toString),
+ job = Option(
+ JobDef(
+ jarURI =
+ submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].dockerInnerMainJarPath,
+ parallelism = 1,
+ args = Array(flinkConfig.toMap.get("$internal.application.program-args")),
+ entryClass = Some(submitRequest.appMain),
+ initialSavepointPath = Some(submitRequest.savePoint),
+ allowNonRestoredState = Some(submitRequest.allowNonRestoredState)
+ )),
+ extJarPaths = submitRequest.userJarFile match {
+ case null => Array.empty[String]
+ case file => Array(file.getAbsolutePath)
+ },
+ flinkConfiguration = submitRequest.extraParameter match {
+ case null => Map.empty
+ case e => e.asScala.map { case (key, value) => key -> value.toString }.toMap
+ }
+ )
+ spec
+ }
+}