Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the lifecycle control of flink application mode -dosubmit #2994

Merged
merged 22 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f3cf1fb
remove scala2.11
caicancai Aug 27, 2023
f34315d
[Feature] [Flink-K8s-V2] Refactor the lifecycle control of Flink appl…
caicancai Aug 29, 2023
7980d71
Merge branch 'apache:dev' into Refactor_the_lifecycle_control_of_Flin…
caicancai Aug 29, 2023
6fcfdf3
add v2 denpendcy
caicancai Aug 29, 2023
17ecd9b
Merge remote-tracking branch 'origin/Refactor_the_lifecycle_control_o…
caicancai Aug 29, 2023
5880491
mvn spotless:apply
caicancai Aug 29, 2023
e11491e
throw exception
caicancai Aug 29, 2023
32076c8
optimization
caicancai Aug 30, 2023
d2b3c7a
Merge branch 'apache:dev' into Refactor_the_lifecycle_control_of_Flin…
caicancai Aug 30, 2023
09f1e2d
Merge branch 'apache:dev' into Refactor_the_lifecycle_control_of_Flin…
caicancai Sep 1, 2023
7e32072
Code optimization
caicancai Sep 1, 2023
0ecf7c1
add comment
caicancai Sep 3, 2023
8854af1
add comment
caicancai Sep 3, 2023
dc88377
Merge branch 'apache:dev' into Refactor_the_lifecycle_control_of_Flin…
caicancai Sep 3, 2023
c3b20a4
add zio denpency
caicancai Sep 5, 2023
4e2caa9
resolve dependency conflicts
caicancai Sep 5, 2023
b69df1e
add KubernetesNativeApplicationClient_V2
caicancai Sep 5, 2023
2eb3510
Merge branch 'apache:dev' into Refactor_the_lifecycle_control_of_Flin…
caicancai Sep 5, 2023
a5f81bc
Parameter optimization and add license
caicancai Sep 6, 2023
34bb149
Parameter optimization and add license
caicancai Sep 6, 2023
10c52b4
add flinkconfig param
caicancai Sep 6, 2023
3ab44ae
Rename to KubernetesNativeApplicationV2
caicancai Sep 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-kubernetes-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>jackson-dataformat-yaml</artifactId>
<groupId>com.fasterxml.jackson.dataformat</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn${scala.binary.flink.version}</artifactId>
Expand Down Expand Up @@ -94,6 +106,21 @@
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
</dependency>

<!-- ZIO -->
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-logging_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-streams_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-concurrent_${scala.binary.version}</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.flink.client.impl

import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode}
import org.apache.streampark.common.zio.ZIOExt.IOOps
import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, JobDef, JobManagerDef, TaskManagerDef}
import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse

import org.apache.commons.lang3.StringUtils
import org.apache.flink.configuration.{Configuration, DeploymentOptions, ExecutionOptions, JobManagerOptions, PipelineOptions, TaskManagerOptions}
import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion

import scala.collection.JavaConverters._
import scala.language.postfixOps

object KubernetesNativeApplicationClientV2 extends KubernetesNativeClientTrait {
@throws[Exception]
override def doSubmit(
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {

// require parameters
require(
StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId),
s"[flink-submit] submit flink job failed, clusterId is null, mode=${flinkConfig.get(DeploymentOptions.TARGET)}"
)

// check the last building result
submitRequest.checkBuildResult()

try {
val spec: FlinkDeploymentDef = convertFlinkDeploymentDef(submitRequest, flinkConfig)
FlinkK8sOperator.deployApplicationJob(submitRequest.id, spec).runIO
val result = SubmitResponse(null, flinkConfig.toMap, submitRequest.jobId, null)
logInfo(
s"[flink-submit] flink job has been submitted. ${flinkConfIdentifierInfo(flinkConfig)}")
result
} catch {
case e: Exception =>
logError(s"submit flink job fail in ${submitRequest.executionMode} mode")
throw e
} finally {}
}

override def doCancel(
cancelRequest: CancelRequest,
flinkConfig: Configuration): CancelResponse = {
flinkConfig.safeSet(
DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
super.doCancel(cancelRequest, flinkConfig)
}

override def doTriggerSavepoint(
request: TriggerSavepointRequest,
flinkConf: Configuration): SavepointResponse = {
flinkConf.safeSet(DeploymentOptions.TARGET, ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
super.doTriggerSavepoint(request, flinkConf)
}

private[this] def convertFlinkDeploymentDef(
submitRequest: SubmitRequest,
flinkConfig: Configuration): FlinkDeploymentDef = {
val spec = FlinkDeploymentDef(
name = submitRequest.appName,
namespace = submitRequest.k8sSubmitParam.kubernetesNamespace,
image = submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].flinkImageTag,
flinkVersion = Option(submitRequest.flinkVersion.majorVersion)
.map(_.replace(".", "_"))
.map("V" + _)
.flatMap(v => FlinkVersion.values().find(_.name() == v)) match {
case Some(version) => version
case None => throw new IllegalArgumentException("Flink version not found")
},
jobManager = JobManagerDef(
cpu = 1,
memory = flinkConfig.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).toString),
taskManager = TaskManagerDef(
cpu = 1,
memory = flinkConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).toString),
job = Option(
JobDef(
jarURI =
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].dockerInnerMainJarPath,
parallelism = 1,
args = Array(flinkConfig.toMap.get("$internal.application.program-args")),
entryClass = Some(submitRequest.appMain),
initialSavepointPath = Some(submitRequest.savePoint),
allowNonRestoredState = Some(submitRequest.allowNonRestoredState)
)),
extJarPaths = submitRequest.userJarFile match {
case null => Array.empty[String]
case file => Array(file.getAbsolutePath)
},
flinkConfiguration = submitRequest.extraParameter match {
case null => Map.empty
case e => e.asScala.map { case (key, value) => key -> value.toString }.toMap
}
)
spec
}
}
Loading