diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala index 7313bf2012..f5a20923a8 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala @@ -18,9 +18,22 @@ package org.apache.streampark.common.conf /** Flink kubernetes Configuration for v1 version */ -@deprecated("see: org.apache.streampark.flink.kubernetes.v2.Config") + object K8sFlinkConfig { + lazy val isV2Enabled: Boolean = InternalConfigHolder.get(ENABLE_V2) + + val ENABLE_V2: InternalOption = InternalOption( + key = "streampark.flink-k8s.enable-v2", + defaultValue = false, + classType = classOf[Boolean], + description = + "Whether to enable the v2 version(base on flink-kubernetes-operator) of flink kubernetes operation" + ) + + // ======= deprecated ======= + + @deprecated val jobStatusTrackTaskTimeoutSec: InternalOption = InternalOption( key = "streampark.flink-k8s.tracking.polling-task-timeout-sec.job-status", defaultValue = 120L, @@ -28,6 +41,7 @@ object K8sFlinkConfig { description = "run timeout seconds of single flink-k8s metrics tracking task" ) + @deprecated val metricTrackTaskTimeoutSec: InternalOption = InternalOption( key = "streampark.flink-k8s.tracking.polling-task-timeout-sec.cluster-metric", defaultValue = 120L, @@ -35,6 +49,7 @@ object K8sFlinkConfig { description = "run timeout seconds of single flink-k8s job status tracking task" ) + @deprecated val jobStatueTrackTaskIntervalSec: InternalOption = InternalOption( key = "streampark.flink-k8s.tracking.polling-interval-sec.job-status", defaultValue = 5L, @@ -42,6 +57,7 @@ object K8sFlinkConfig { description = "interval seconds between two single flink-k8s metrics tracking task" ) + @deprecated val metricTrackTaskIntervalSec: InternalOption = InternalOption( key = "streampark.flink-k8s.tracking.polling-interval-sec.cluster-metric", defaultValue = 5L, @@ -49,6 +65,7 @@ object K8sFlinkConfig { description = "interval seconds between two single flink-k8s metrics tracking task" ) + @deprecated val silentStateJobKeepTrackingSec: InternalOption = InternalOption( key = "streampark.flink-k8s.tracking.silent-state-keep-sec", defaultValue = 60, @@ -69,6 +86,7 @@ object K8sFlinkConfig { ) /** kubernetes default namespace */ + @deprecated val DEFAULT_KUBERNETES_NAMESPACE = "default" } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala index 6fb546c65e..7baa195d39 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOExt.scala @@ -17,14 +17,16 @@ package org.apache.streampark.common.zio -import zio.{IO, Runtime, Unsafe, ZIO} +import zio.{FiberFailure, IO, Runtime, Unsafe, ZIO} import zio.stream.ZStream +import scala.util.Try + /** ZIO extension */ object ZIOExt { /* Unsafe run zio effect. */ - @throws[Exception] + @throws[FiberFailure] @inline def unsafeRun[E, A](zio: IO[E, A]): A = Unsafe.unsafe { implicit u => Runtime.default.unsafe @@ -32,11 +34,25 @@ object ZIOExt { .getOrThrowFiberFailure() } + /** unsafe run IO to Either. */ + @inline def unsafeRunToEither[E, A](zio: IO[E, A]): Either[Throwable, A] = Unsafe.unsafe { + implicit u => + Runtime.default.unsafe + .run(zio.provideLayer(Runtime.removeDefaultLoggers >>> ZIOLogger.default)) + .toEither + } + implicit class IOOps[E, A](io: ZIO[Any, E, A]) { /** unsafe run IO */ - @throws[Throwable] + @throws[FiberFailure] def runIO: A = ZIOExt.unsafeRun(io) + + /** unsafe run IO to Try. */ + def runIOAsTry: Try[A] = unsafeRunToEither(io).toTry + + /** unsafe run IO to Either. */ + def runIOAsEither: Either[Throwable, A] = unsafeRunToEither(io) } implicit class UIOOps[A](uio: ZIO[Any, Nothing, A]) { diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOJavaUtil.scala b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOJavaUtil.scala new file mode 100644 index 0000000000..d7260b5d5f --- /dev/null +++ b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOJavaUtil.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.zio + +import zio.{FiberFailure, IO, UIO} + +/** Util for running ZIO effects in Java. */ +object ZIOJavaUtil { + + @throws[FiberFailure] + def runIO[E, A](zio: IO[E, A]): A = ZIOExt.unsafeRun(zio) + + def runUIO[A](uio: UIO[A]): A = ZIOExt.unsafeRun(uio) + +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java index ff69ed84b8..4c836469fa 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java @@ -294,6 +294,7 @@ public void cancel(Application appParam) throws Exception { CancelRequest cancelRequest = new CancelRequest( + application.getId(), flinkEnv.getFlinkVersion(), ExecutionMode.of(application.getExecutionMode()), properties, @@ -413,10 +414,13 @@ public void start(Application appParam, boolean auto) throws Exception { extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql()); } + // TODO Need to display more K8s submission parameters in the front-end UI. + // See: org.apache.streampark.flink.client.bean.KubernetesSubmitParam KubernetesSubmitParam kubernetesSubmitParam = - new KubernetesSubmitParam( + KubernetesSubmitParam.apply( application.getClusterId(), application.getK8sNamespace(), + application.getFlinkImage(), application.getK8sRestExposedTypeEnum()); Tuple2 userJarAndAppConf = getUserJarAndAppConf(flinkEnv, application); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java index eeabbaad8f..ea8fc61c15 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java @@ -17,11 +17,13 @@ package org.apache.streampark.console.core.service.application.impl; +import org.apache.streampark.common.conf.K8sFlinkConfig; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.enums.StorageType; import org.apache.streampark.common.fs.HdfsOperator; import org.apache.streampark.common.util.DeflaterUtils; +import org.apache.streampark.common.zio.ZIOJavaUtil; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; @@ -54,6 +56,7 @@ import org.apache.streampark.console.core.service.application.ApplicationManageService; import org.apache.streampark.console.core.task.FlinkHttpWatcher; import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher; +import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver; import org.apache.streampark.flink.packer.pipeline.PipelineStatus; import org.apache.commons.lang3.StringUtils; @@ -176,9 +179,11 @@ public Boolean delete(Application appParam) { // 8) remove app removeApp(application); - if (isKubernetesApp(application)) { k8SFlinkTrackMonitor.unWatching(toTrackId(application)); + if (K8sFlinkConfig.isV2Enabled()) { + ZIOJavaUtil.runUIO(FlinkK8sObserver.untrackById(application.getId())); + } } else { FlinkHttpWatcher.unWatching(appParam.getId()); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 7bbbb5afe8..7260fc72f9 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -18,6 +18,7 @@ package org.apache.streampark.console.core.service.impl; import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.K8sFlinkConfig; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ApplicationType; import org.apache.streampark.common.enums.DevelopmentMode; @@ -78,6 +79,7 @@ import org.apache.streampark.flink.packer.pipeline.PipelineStatus; import org.apache.streampark.flink.packer.pipeline.PipelineType; import org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipeline; +import org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipelineV2; import org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sSessionBuildPipeline; import org.apache.streampark.flink.packer.pipeline.impl.FlinkRemoteBuildPipeline; import org.apache.streampark.flink.packer.pipeline.impl.FlinkYarnApplicationBuildPipeline; @@ -511,7 +513,11 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { dockerConfig.getPassword()), app.getIngressTemplate()); log.info("Submit params to building pipeline : {}", k8sApplicationBuildRequest); - return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest); + if (K8sFlinkConfig.isV2Enabled()) { + return FlinkK8sApplicationBuildPipelineV2.of(k8sApplicationBuildRequest); + } else { + return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest); + } default: throw new UnsupportedOperationException( "Unsupported Building Application for ExecutionMode: " + app.getExecutionModeEnum()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java index 20f7c31f45..c85d0f1d4d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java @@ -493,6 +493,7 @@ private TriggerSavepointRequest renderTriggerSavepointRequest( Map properties = this.tryGetRestProps(application, cluster); return new TriggerSavepointRequest( + application.getId(), flinkEnv.getFlinkVersion(), application.getExecutionModeEnum(), properties, diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java index c385764ef1..32f7a23917 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.core.task; +import org.apache.streampark.common.conf.K8sFlinkConfig; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.service.application.ApplicationManageService; @@ -82,11 +83,15 @@ public FlinkK8sWatcher registerFlinkK8sWatcher() { } private void initFlinkK8sWatcher(@Nonnull FlinkK8sWatcher trackMonitor) { - // register change event listener - trackMonitor.registerListener(flinkK8sChangeEventListener); - // recovery tracking list - List k8sApp = getK8sWatchingApps(); - k8sApp.forEach(trackMonitor::doWatching); + if (!K8sFlinkConfig.isV2Enabled()) { + // register change event listener + trackMonitor.registerListener(flinkK8sChangeEventListener); + // recovery tracking list + List k8sApp = getK8sWatchingApps(); + k8sApp.forEach(trackMonitor::doWatching); + } else { + // TODO [flink-k8s-v2] Recovery tracking list and invoke FlinkK8sObserver.track() + } } /** get flink-k8s job tracking application from db. */ diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala index dd5481dc48..f06db3d9d2 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala @@ -25,6 +25,7 @@ import javax.annotation.Nullable import java.util.{Map => JavaMap} case class CancelRequest( + id: Long, flinkVersion: FlinkVersion, executionMode: ExecutionMode, @Nullable properties: JavaMap[String, Any], diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala new file mode 100644 index 0000000000..07832841c1 --- /dev/null +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.client.bean + +import org.apache.streampark.common.enums.FlinkK8sRestExposedType + +import javax.annotation.Nullable + +import java.util +import java.util.{Map => JMap} + +/** + * TODO Need to display more K8s submission parameters in the front-end UI. + * + * It will eventually be converted to + * [[org.apache.streampark.flink.kubernetes.v2.model.FlinkDeploymentDef]] + * + * The logic of conversion is located at: + * [[org.apache.streampark.flink.client.impl.KubernetesApplicationClientV2#genFlinkDeployDef]] + */ +// todo split into Application mode and SessionJob mode +case class KubernetesSubmitParam( + clusterId: String, + kubernetesNamespace: String, + baseImage: Option[String] = None, + imagePullPolicy: Option[String] = None, + serviceAccount: Option[String] = None, + podTemplate: Option[String] = None, + jobManagerCpu: Option[Double] = None, + jobManagerMemory: Option[String] = None, + jobManagerEphemeralStorage: Option[String] = None, + jobManagerPodTemplate: Option[String] = None, + taskManagerCpu: Option[Double] = None, + taskManagerMemory: Option[String] = None, + taskManagerEphemeralStorage: Option[String] = None, + taskManagerPodTemplate: Option[String] = None, + logConfiguration: JMap[String, String] = new util.HashMap[String, String](), + flinkRestExposedType: Option[FlinkK8sRestExposedType] = None +) + +object KubernetesSubmitParam { + + /** + * Compatible with streampark old native k8s submission parameters. + * + * @param clusterId + * flink cluster id in k8s cluster. + * @param kubernetesNamespace + * k8s namespace. + * @param flinkRestExposedType + * flink rest-service exposed type on k8s cluster. + */ + def apply( + clusterId: String, + kubernetesNamespace: String, + baseImage: String, + @Nullable flinkRestExposedType: FlinkK8sRestExposedType): KubernetesSubmitParam = + KubernetesSubmitParam( + clusterId = clusterId, + kubernetesNamespace = kubernetesNamespace, + baseImage = Some(baseImage), + flinkRestExposedType = Option(flinkRestExposedType)) +} diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala index d06c036c15..4989abaf85 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala @@ -36,19 +36,6 @@ import java.util.{Map => JavaMap} import scala.collection.convert.ImplicitConversions._ import scala.util.Try -/** - * @param clusterId - * flink cluster id in k8s cluster. - * @param kubernetesNamespace - * k8s namespace. - * @param flinkRestExposedType - * flink rest-service exposed type on k8s cluster. - */ -case class KubernetesSubmitParam( - clusterId: String, - kubernetesNamespace: String, - @Nullable flinkRestExposedType: FlinkK8sRestExposedType) - case class SubmitRequest( flinkVersion: FlinkVersion, executionMode: ExecutionMode, diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala index 035c5348d3..85c788da7f 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala @@ -26,6 +26,7 @@ import java.util.{Map => JavaMap} /** Trigger savepoint request. */ case class TriggerSavepointRequest( + id: Long, flinkVersion: FlinkVersion, executionMode: ExecutionMode, @Nullable properties: JavaMap[String, Any], diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala index e85ceb6ee3..810df34e39 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala @@ -17,6 +17,7 @@ package org.apache.streampark.flink.client +import org.apache.streampark.common.conf.K8sFlinkConfig import org.apache.streampark.common.enums.ExecutionMode import org.apache.streampark.common.enums.ExecutionMode._ import org.apache.streampark.flink.client.`trait`.FlinkClientTrait @@ -32,7 +33,10 @@ object FlinkClientHandler { YARN_SESSION -> YarnSessionClient, YARN_PER_JOB -> YarnPerJobClient, KUBERNETES_NATIVE_SESSION -> KubernetesNativeSessionClient, - KUBERNETES_NATIVE_APPLICATION -> KubernetesNativeApplicationClient + KUBERNETES_NATIVE_APPLICATION -> { + if (K8sFlinkConfig.isV2Enabled) KubernetesApplicationClientV2 + else KubernetesNativeApplicationClient + } ) def submit(request: SubmitRequest): SubmitResponse = { diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala new file mode 100644 index 0000000000..665159c03d --- /dev/null +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.client.impl + +import org.apache.streampark.common.util.Logger +import org.apache.streampark.common.zio.ZIOExt.IOOps +import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait +import org.apache.streampark.flink.client.bean._ +import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, JobManagerDef, TaskManagerDef} +import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator +import org.apache.streampark.flink.packer.pipeline.K8sAppModeBuildResponse + +import org.apache.flink.client.deployment.application.ApplicationConfiguration +import org.apache.flink.configuration._ +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions +import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion + +import scala.collection.mutable +import scala.jdk.CollectionConverters.mapAsScalaMapConverter +import scala.util.{Failure, Success, Try} + +/** Flink K8s application mode task operation client via Flink K8s Operator */ +object KubernetesApplicationClientV2 extends KubernetesClientV2Trait with Logger { + + @throws[Exception] + override def doSubmit( + submitRequest: SubmitRequest, + flinkConfig: Configuration): SubmitResponse = { + + val richMsg: String => String = s"[flink-submit][appId=${submitRequest.id}] " + _ + + submitRequest.checkBuildResult() + val buildResult = submitRequest.buildResult.asInstanceOf[K8sAppModeBuildResponse] + + // Convert to FlinkDeployment CR definition + val flinkDeployDef = genFlinkDeployDef(submitRequest, flinkConfig, buildResult) match { + case Right(result) => result + case Left(errMsg) => + throw new IllegalArgumentException( + richMsg(s"Error occurred while parsing parameters: $errMsg")) + } + + // Submit FlinkDeployment CR to Kubernetes + FlinkK8sOperator.deployApplicationJob(submitRequest.id, flinkDeployDef).runIOAsTry match { + case Success(_) => + logInfo(richMsg("Flink job has been submitted successfully.")) + case Failure(err) => + logError( + richMsg(s"Submit Flink job fail in ${submitRequest.executionMode.getName}_V2 mode!"), + err) + throw err + } + + SubmitResponse( + clusterId = submitRequest.k8sSubmitParam.clusterId, + flinkConfig = flinkConfig.toMap, + jobId = submitRequest.jobId, + jobManagerUrl = null + ) + } + + // Generate FlinkDeployment CR definition, it is a pure effect function. + private def genFlinkDeployDef( + submitReq: SubmitRequest, + originFlinkConfig: Configuration, + buildResult: K8sAppModeBuildResponse): Either[FailureMessage, FlinkDeploymentDef] = { + + val flinkConfObj = originFlinkConfig.clone() + val flinkConfMap = originFlinkConfig.toMap.asScala.toMap + + val namespace = Option(submitReq.k8sSubmitParam.kubernetesNamespace) + .getOrElse("default") + + val name = Option(submitReq.k8sSubmitParam.clusterId) + .filter(!_.isBlank) + .getOrElse(return Left("cluster-id should not be empty")) + + val image = submitReq.k8sSubmitParam.baseImage + .orElse(Option(buildResult.flinkBaseImage)) + .filter(!_.isBlank) + .getOrElse(return Left("Flink base image should not be empty")) + + val imagePullPolicy = flinkConfObj + .getOption(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY) + .map(_.toString) + .orElse(submitReq.k8sSubmitParam.imagePullPolicy) + + val serviceAccount = flinkConfObj + .getOption(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT) + .orElse(submitReq.k8sSubmitParam.serviceAccount) + .getOrElse(FlinkDeploymentDef.DEFAULT_SERVICE_ACCOUNT) + + val flinkVersion = Option(submitReq.flinkVersion.majorVersion) + .map(majorVer => "V" + majorVer.replace(".", "_")) + .flatMap(v => FlinkVersion.values().find(_.name() == v)) + .getOrElse(return Left(s"Unsupported Flink version: ${submitReq.flinkVersion.majorVersion}")) + + val jobDef = genJobDef(flinkConfObj, jarUriHint = Some(buildResult.mainJarPath)) + .getOrElse(return Left("Invalid job definition")) + + val podTemplate = submitReq.k8sSubmitParam.podTemplate.map( + yaml => + unmarshalPodTemplate(yaml) + .getOrElse(return Left(s"Invalid pod template: \n$yaml"))) + + val jobManager = { + val cpu = flinkConfMap + .get(KUBERNETES_JM_CPU_AMOUNT_KEY) + .orElse(flinkConfMap.get(KUBERNETES_JM_CPU_KEY)) + .flatMap(value => Try(value.toDouble).toOption) + .orElse(submitReq.k8sSubmitParam.jobManagerCpu) + .getOrElse(KUBERNETES_JM_CPU_DEFAULT) + + val mem = flinkConfObj + .getOption(JobManagerOptions.TOTAL_PROCESS_MEMORY) + .map(_.toString) + .orElse(submitReq.k8sSubmitParam.jobManagerMemory) + .getOrElse(KUBERNETES_JM_MEMORY_DEFAULT) + + val podTemplate = submitReq.k8sSubmitParam.jobManagerPodTemplate.map( + yaml => + unmarshalPodTemplate(yaml) + .getOrElse(return Left(s"Invalid job manager pod template: \n$yaml"))) + JobManagerDef( + cpu = cpu, + memory = mem, + ephemeralStorage = submitReq.k8sSubmitParam.jobManagerEphemeralStorage, + podTemplate = podTemplate) + } + + val taskManager = { + val cpu = flinkConfMap + .get(KUBERNETES_TM_CPU_AMOUNT_KEY) + .orElse(flinkConfMap.get(KUBERNETES_TM_CPU_KEY)) + .flatMap(value => Try(value.toDouble).toOption) + .orElse(submitReq.k8sSubmitParam.taskManagerCpu) + .getOrElse(KUBERNETES_TM_CPU_DEFAULT) + + val mem = flinkConfObj + .getOption(TaskManagerOptions.TOTAL_PROCESS_MEMORY) + .map(_.toString) + .orElse(submitReq.k8sSubmitParam.taskManagerMemory) + .getOrElse(KUBERNETES_TM_MEMORY_DEFAULT) + + val podTemplate = submitReq.k8sSubmitParam.taskManagerPodTemplate.map( + yaml => + unmarshalPodTemplate(yaml) + .getOrElse(return Left(s"Invalid task manager pod template: \n$yaml"))) + TaskManagerDef( + cpu = cpu, + memory = mem, + ephemeralStorage = submitReq.k8sSubmitParam.taskManagerEphemeralStorage, + podTemplate = podTemplate) + } + + val logConfiguration = { + val items = submitReq.k8sSubmitParam.logConfiguration.asScala + if (items.isEmpty) { + // Get default log config from local target flink home + val logConfigs = Array( + "log4j.properties" -> s"${submitReq.flinkVersion.flinkHome}/conf/log4j-console.properties", + "logback.xml" -> s"${submitReq.flinkVersion.flinkHome}/conf/logback-console.xml" + ) + logConfigs + .map { case (name, path) => name -> os.Path(path) } + .filter { case (_, path) => Try(os.exists(path) && os.isFile(path)).getOrElse(false) } + .map { case (name, path) => name -> Try(os.read(path)).toOption.filter(!_.isBlank) } + .filter { case (_, content) => content.isDefined } + .map { case (name, content) => name -> content.get } + .foreach { case (name, content) => items += name -> content } + } + items.toMap + } + + val extraFlinkConfiguration = { + // Remove conflicting configuration items + val result: mutable.Map[String, String] = flinkConfObj + .remove(DeploymentOptions.TARGET) + .remove(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY) + .remove(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT) + .remove(JobManagerOptions.TOTAL_PROCESS_MEMORY) + .remove(TaskManagerOptions.TOTAL_PROCESS_MEMORY) + .remove(PipelineOptions.JARS) + .remove(CoreOptions.DEFAULT_PARALLELISM) + .remove(ApplicationConfiguration.APPLICATION_ARGS) + .remove(ApplicationConfiguration.APPLICATION_MAIN_CLASS) + .remove(SavepointConfigOptions.SAVEPOINT_PATH) + .remove(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE) + .toMap + .asScala + .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY) + .removeKey(KUBERNETES_TM_CPU_KEY) + .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY) + .removeKey(KUBERNETES_JM_CPU_KEY) + // Set kubernetes.rest-service.exposed.type configuration for compatibility with native-k8s + submitReq.k8sSubmitParam.flinkRestExposedType.foreach { + exposedType => result += KUBERNETES_REST_SERVICE_EXPORTED_TYPE_KEY -> exposedType.getName + } + result.toMap + } + + // TODO Migrate the construction logic of ingress to here and set it into FlinkDeploymentDef.ingress + // See: org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipeline Step-8 + Right( + FlinkDeploymentDef( + namespace = namespace, + name = name, + image = image, + imagePullPolicy = imagePullPolicy, + serviceAccount = serviceAccount, + flinkVersion = flinkVersion, + jobManager = jobManager, + taskManager = taskManager, + flinkConfiguration = extraFlinkConfiguration, + logConfiguration = logConfiguration, + podTemplate = podTemplate, + job = Some(jobDef), + extJarPaths = Array.empty + )) + } + +} diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala deleted file mode 100644 index f43e88f0b7..0000000000 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.flink.client.impl - -import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode} -import org.apache.streampark.common.zio.ZIOExt.IOOps -import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait -import org.apache.streampark.flink.client.bean._ -import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, JobDef, JobManagerDef, TaskManagerDef} -import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator -import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse - -import org.apache.commons.lang3.StringUtils -import org.apache.flink.configuration.{Configuration, DeploymentOptions, ExecutionOptions, JobManagerOptions, PipelineOptions, TaskManagerOptions} -import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion - -import scala.collection.JavaConverters._ -import scala.language.postfixOps - -object KubernetesNativeApplicationClientV2 extends KubernetesNativeClientTrait { - @throws[Exception] - override def doSubmit( - submitRequest: SubmitRequest, - flinkConfig: Configuration): SubmitResponse = { - - // require parameters - require( - StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId), - s"[flink-submit] submit flink job failed, clusterId is null, mode=${flinkConfig.get(DeploymentOptions.TARGET)}" - ) - - // check the last building result - submitRequest.checkBuildResult() - - try { - val spec: FlinkDeploymentDef = convertFlinkDeploymentDef(submitRequest, flinkConfig) - FlinkK8sOperator.deployApplicationJob(submitRequest.id, spec).runIO - val result = SubmitResponse(null, flinkConfig.toMap, submitRequest.jobId, null) - logInfo( - s"[flink-submit] flink job has been submitted. ${flinkConfIdentifierInfo(flinkConfig)}") - result - } catch { - case e: Exception => - logError(s"submit flink job fail in ${submitRequest.executionMode} mode") - throw e - } finally {} - } - - override def doCancel( - cancelRequest: CancelRequest, - flinkConfig: Configuration): CancelResponse = { - flinkConfig.safeSet( - DeploymentOptions.TARGET, - ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName) - super.doCancel(cancelRequest, flinkConfig) - } - - override def doTriggerSavepoint( - request: TriggerSavepointRequest, - flinkConf: Configuration): SavepointResponse = { - flinkConf.safeSet(DeploymentOptions.TARGET, ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName) - super.doTriggerSavepoint(request, flinkConf) - } - - private[this] def convertFlinkDeploymentDef( - submitRequest: SubmitRequest, - flinkConfig: Configuration): FlinkDeploymentDef = { - val spec = FlinkDeploymentDef( - name = submitRequest.appName, - namespace = submitRequest.k8sSubmitParam.kubernetesNamespace, - image = submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].flinkImageTag, - flinkVersion = Option(submitRequest.flinkVersion.majorVersion) - .map(_.replace(".", "_")) - .map("V" + _) - .flatMap(v => FlinkVersion.values().find(_.name() == v)) match { - case Some(version) => version - case None => throw new IllegalArgumentException("Flink version not found") - }, - jobManager = JobManagerDef( - cpu = 1, - memory = flinkConfig.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).toString), - taskManager = TaskManagerDef( - cpu = 1, - memory = flinkConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).toString), - job = Option( - JobDef( - jarURI = - submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].dockerInnerMainJarPath, - parallelism = 1, - args = Array(flinkConfig.toMap.get("$internal.application.program-args")), - entryClass = Some(submitRequest.appMain), - initialSavepointPath = Some(submitRequest.savePoint), - allowNonRestoredState = Some(submitRequest.allowNonRestoredState) - )), - extJarPaths = submitRequest.userJarFile match { - case null => Array.empty[String] - case file => Array(file.getAbsolutePath) - }, - flinkConfiguration = submitRequest.extraParameter match { - case null => Map.empty - case e => e.asScala.map { case (key, value) => key -> value.toString }.toMap - } - ) - spec - } -} diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 59aa46cc3f..e6b16e3772 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -543,6 +543,13 @@ trait FlinkClientTrait extends Logger { case x => x } } + def getOption[T](key: ConfigOption[T]): Option[T] = { + Option(flinkConfig.get(key)) + } + def remove[T](key: ConfigOption[T]): Configuration = { + flinkConfig.removeConfig(key) + flinkConfig + } } private[client] def cancelJob( diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala new file mode 100644 index 0000000000..75b9792469 --- /dev/null +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesClientV2Trait.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.client.`trait` + +import org.apache.streampark.common.zio.ZIOExt.IOOps +import org.apache.streampark.flink.client.`trait`.KubernetesClientV2.{StopJobFail, TriggerJobSavepointFail} +import org.apache.streampark.flink.client.bean._ +import org.apache.streampark.flink.kubernetes.v2.model.{JobDef, JobSavepointDef} +import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator +import org.apache.streampark.flink.kubernetes.v2.yamlMapper + +import io.fabric8.kubernetes.api.model.Pod +import org.apache.flink.client.deployment.application.ApplicationConfiguration +import org.apache.flink.configuration.{Configuration, CoreOptions, PipelineOptions} +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions +import zio.ZIO + +import scala.collection.mutable +import scala.jdk.CollectionConverters.asScalaBufferConverter +import scala.util.{Failure, Success, Try} + +trait KubernetesClientV2Trait extends FlinkClientTrait { + + protected type FailureMessage = String + + protected val KUBERNETES_JM_CPU_KEY = "kubernetes.jobmanager.cpu" + protected val KUBERNETES_JM_CPU_AMOUNT_KEY = "kubernetes.jobmanager.cpu.amount" + protected val KUBERNETES_JM_CPU_DEFAULT = 1.0 + protected val KUBERNETES_JM_MEMORY_DEFAULT = "1600m" + + protected val KUBERNETES_TM_CPU_KEY = "kubernetes.taskmanager.cpu" + protected val KUBERNETES_TM_CPU_AMOUNT_KEY = "kubernetes.taskmanager.cpu.amount" + protected val KUBERNETES_TM_CPU_DEFAULT = -1.0 + protected val KUBERNETES_TM_MEMORY_DEFAULT = "1728m" + + protected val KUBERNETES_REST_SERVICE_EXPORTED_TYPE_KEY = "kubernetes.rest-service.exposed.type" + + override def setConfig(submitRequest: SubmitRequest, flinkConf: Configuration): Unit = {} + + implicit protected class FlinkConfMapOps(map: mutable.Map[String, String]) { + def removeKey(key: String): mutable.Map[String, String] = { map -= key; map } + } + + protected def unmarshalPodTemplate(yaml: String): Try[Pod] = { + Try(yamlMapper.readValue(yaml, classOf[Pod])) + } + + protected def genJobDef( + flinkConfObj: Configuration, + jarUriHint: Option[String]): Either[FailureMessage, JobDef] = { + + val jarUri = jarUriHint + .orElse(flinkConfObj.getOption(PipelineOptions.JARS).flatMap(_.asScala.headOption)) + .getOrElse(return Left("Flink job uri should not be empty")) + + val parallel = flinkConfObj + .getOption(CoreOptions.DEFAULT_PARALLELISM) + .getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue()) + + val args = flinkConfObj + .getOption(ApplicationConfiguration.APPLICATION_ARGS) + .map(_.asScala.toArray) + .getOrElse(Array.empty[String]) + + val entryClass = flinkConfObj + .getOption(ApplicationConfiguration.APPLICATION_MAIN_CLASS) + + val savePointPath = flinkConfObj + .getOption(SavepointConfigOptions.SAVEPOINT_PATH) + + val allowNonRestoredState = flinkConfObj + .getOption(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE) + .map(_.booleanValue()) + + Right( + JobDef( + jarURI = jarUri, + parallelism = parallel, + entryClass = entryClass, + args = args, + initialSavepointPath = savePointPath, + allowNonRestoredState = allowNonRestoredState + )) + } + + @throws[Exception] + override def doCancel(request: CancelRequest, flinkConf: Configuration): CancelResponse = { + val effect = + if (!request.withSavepoint) { + // cancel job + FlinkK8sOperator + .cancelJob(request.id) + .as(CancelResponse(null)) + } else { + // stop job with savepoint + val savepointDef = JobSavepointDef( + drain = Option(request.withDrain).getOrElse(false), + savepointPath = Option(request.savepointPath), + formatType = Option(request.nativeFormat) + .map(if (_) JobSavepointDef.NATIVE_FORMAT else JobSavepointDef.CANONICAL_FORMAT) + ) + FlinkK8sOperator + .stopJob(request.id, savepointDef) + .flatMap { + result => + if (result.isFailed) ZIO.fail(StopJobFail(result.failureCause.get)) + else ZIO.succeed(CancelResponse(result.location.orNull)) + } + } + + def richMsg: String => String = s"[flink-cancel][appId=${request.id}] " + _ + + effect.runIOAsTry match { + case Success(rsp) => + logInfo(richMsg("Cancel flink job successfully.")) + rsp + case Failure(err) => + logError( + richMsg(s"Cancel flink job fail in ${request.executionMode.getName}_V2 mode!"), + err) + throw err + } + } + + @throws[Exception] + override def doTriggerSavepoint( + request: TriggerSavepointRequest, + flinkConf: Configuration): SavepointResponse = { + + val savepointDef = JobSavepointDef( + savepointPath = Option(request.savepointPath), + formatType = Option(request.nativeFormat) + .map(if (_) JobSavepointDef.NATIVE_FORMAT else JobSavepointDef.CANONICAL_FORMAT) + ) + + def richMsg: String => String = s"[flink-trigger-savepoint][appId=${request.id}] " + _ + + FlinkK8sOperator + .triggerJobSavepoint(request.id, savepointDef) + .flatMap { + result => + if (result.isFailed) ZIO.fail(TriggerJobSavepointFail(result.failureCause.get)) + else ZIO.succeed(SavepointResponse(result.location.orNull)) + } + .runIOAsTry match { + case Success(rsp) => + logInfo(richMsg("Cancel flink job successfully.")) + rsp + case Failure(err) => + logError( + richMsg(s"Cancel flink job fail in ${request.executionMode.getName}_V2 mode!"), + err) + throw err + } + } + +} + +object KubernetesClientV2 { + + case class StopJobFail(msg: String) extends Exception(msg) + case class TriggerJobSavepointFail(msg: String) extends Exception(msg) +} diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala index 2dd756b8f4..8ca6c518fe 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala @@ -44,7 +44,7 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait { .safeSet(KubernetesConfigOptions.NAMESPACE, submitRequest.k8sSubmitParam.kubernetesNamespace) .safeSet( KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, - covertToServiceExposedType(submitRequest.k8sSubmitParam.flinkRestExposedType)) + covertToServiceExposedType(submitRequest.k8sSubmitParam.flinkRestExposedType.get)) if (submitRequest.buildResult != null) { if (submitRequest.executionMode == ExecutionMode.KUBERNETES_NATIVE_APPLICATION) { diff --git a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/FlinkDeploymentDef.scala b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/FlinkDeploymentDef.scala index b3c039bf24..1637eb8da3 100644 --- a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/FlinkDeploymentDef.scala +++ b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/FlinkDeploymentDef.scala @@ -18,7 +18,7 @@ package org.apache.streampark.flink.kubernetes.v2.model import org.apache.streampark.flink.kubernetes.v2.jacksonMapper -import org.apache.streampark.flink.kubernetes.v2.model.FlinkDeploymentDef.mapPodToPodTemplate +import org.apache.streampark.flink.kubernetes.v2.model.FlinkDeploymentDef.{mapPodToPodTemplate, DEFAULT_SERVICE_ACCOUNT} import io.fabric8.kubernetes.api.model.{ObjectMeta, Pod} import org.apache.flink.v1beta1.{flinkdeploymentspec, FlinkDeployment, FlinkDeploymentSpec} @@ -58,7 +58,7 @@ case class FlinkDeploymentDef( name: String, image: String, imagePullPolicy: Option[String] = None, - serviceAccount: String = "flink", + serviceAccount: String = DEFAULT_SERVICE_ACCOUNT, flinkVersion: FlinkVersion, jobManager: JobManagerDef, taskManager: TaskManagerDef, @@ -108,6 +108,9 @@ case class FlinkDeploymentDef( } object FlinkDeploymentDef { + + lazy val DEFAULT_SERVICE_ACCOUNT = "flink" + def mapPodToPodTemplate[A: ClassTag](pod: Pod, clz: Class[A]): Try[A] = Try { val json = jacksonMapper.writeValueAsString(pod) jacksonMapper.readValue(json, clz) diff --git a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala index ee23bd49dc..7f84968397 100644 --- a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala +++ b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala @@ -23,13 +23,13 @@ import org.apache.streampark.flink.kubernetes.v2.model._ import org.apache.streampark.flink.kubernetes.v2.model.TrackKey._ import org.apache.flink.v1beta1.{FlinkDeployment, FlinkDeploymentSpec, FlinkSessionJob, FlinkSessionJobSpec} -import zio.{IO, Ref, Schedule, UIO} +import zio.{IO, Ref, Schedule, UIO, ZIO} import zio.ZIO.logInfo import zio.concurrent.{ConcurrentMap, ConcurrentSet} import zio.stream.ZStream /** Flink Kubernetes resource observer. */ -sealed trait FlinkK8sObserver { +sealed trait FlinkK8sObserverTrait { /** Start tracking resources. */ def track(key: TrackKey): UIO[Unit] @@ -37,6 +37,14 @@ sealed trait FlinkK8sObserver { /** Stop tracking resources. */ def untrack(key: TrackKey): UIO[Unit] + /** Stop tracking resources by TrackKey.id. */ + def untrackById(appId: Long): UIO[Unit] = { + trackedKeys.find(_.id == appId).flatMap { + case Some(key) => untrack(key) + case None => ZIO.unit + } + } + /** All tracked key in observer. */ def trackedKeys: ConcurrentSet[TrackKey] @@ -88,7 +96,7 @@ sealed trait FlinkK8sObserver { } -object FlinkK8sObserver extends FlinkK8sObserver { +object FlinkK8sObserver extends FlinkK8sObserverTrait { // The following is a visible external snapshot. val trackedKeys = ConcurrentSet.empty[TrackKey].runUIO diff --git a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala index d07b60540d..ac5d5b4e46 100644 --- a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala +++ b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala @@ -33,7 +33,7 @@ import zio.stream.ZStream * When deploying or deleting flink resources, the FlinkK8sOperator will automatically * handle the related tracing. */ -sealed trait FlinkK8sOperator { +sealed trait FlinkK8sOperatorTrait { /** Directly operate Flink Kubernetes CR. */ val k8sCrOpr: CROperator.type = CROperator @@ -95,7 +95,7 @@ sealed trait FlinkK8sOperator { } -object FlinkK8sOperator extends FlinkK8sOperator { +object FlinkK8sOperator extends FlinkK8sOperatorTrait { private val obr = FlinkK8sObserver private val flinkRest = FlinkRestRequest diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala index e9a3b06e1e..21975d2488 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultFlinkK8sWatcher.scala @@ -17,6 +17,7 @@ package org.apache.streampark.flink.kubernetes +import org.apache.streampark.common.conf.K8sFlinkConfig import org.apache.streampark.flink.kubernetes.enums.FlinkJobState import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION} import org.apache.streampark.flink.kubernetes.event.{BuildInEvent, FlinkJobStateEvent, FlinkJobStatusChangeEvent} @@ -64,13 +65,15 @@ class DefaultFlinkK8sWatcher(conf: FlinkTrackConfig = FlinkTrackConfig.defaultCo } def doWatching(trackId: TrackId): Unit = { - if (trackId.isLegal) { + if (!K8sFlinkConfig.isV2Enabled && trackId.isLegal) { watchController.trackIds.set(trackId) } } def unWatching(trackId: TrackId): Unit = { - watchController.canceling.set(trackId) + if (!K8sFlinkConfig.isV2Enabled) { + watchController.canceling.set(trackId) + } } override def isInWatching(trackId: TrackId): Boolean = watchController.isInWatching(trackId) diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatcherLazyStartAop.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatcherLazyStartAop.scala index 1d8a1f554b..f544ae4ffb 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatcherLazyStartAop.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatcherLazyStartAop.scala @@ -17,6 +17,7 @@ package org.apache.streampark.flink.kubernetes +import org.apache.streampark.common.conf.K8sFlinkConfig import org.apache.streampark.flink.kubernetes.event.BuildInEvent import org.apache.streampark.flink.kubernetes.model.{ClusterKey, FlinkMetricCV, JobStatusCV, TrackId} @@ -27,37 +28,37 @@ import org.apache.streampark.flink.kubernetes.model.{ClusterKey, FlinkMetricCV, trait FlinkK8sWatcherLazyStartAop extends FlinkK8sWatcher { abstract override def doWatching(trackId: TrackId): Unit = { - start() + if (!K8sFlinkConfig.isV2Enabled) start() super.doWatching(trackId) } abstract override def unWatching(trackId: TrackId): Unit = { - start() + if (!K8sFlinkConfig.isV2Enabled) start() super.unWatching(trackId) } abstract override def isInWatching(trackId: TrackId): Boolean = { - start() + if (!K8sFlinkConfig.isV2Enabled) start() super.isInWatching(trackId) } abstract override def getAllWatchingIds: Set[TrackId] = { - start() + if (!K8sFlinkConfig.isV2Enabled) start() super.getAllWatchingIds } abstract override def getJobStatus(trackId: TrackId): Option[JobStatusCV] = { - start() + if (!K8sFlinkConfig.isV2Enabled) start() super.getJobStatus(trackId) } abstract override def getJobStatus(trackIds: Set[TrackId]): Map[CacheKey, JobStatusCV] = { - start() + if (!K8sFlinkConfig.isV2Enabled) start() super.getJobStatus(trackIds) } abstract override def getAllJobStatus: Map[CacheKey, JobStatusCV] = { - start() + if (!K8sFlinkConfig.isV2Enabled) start() super.getAllJobStatus } @@ -72,12 +73,12 @@ trait FlinkK8sWatcherLazyStartAop extends FlinkK8sWatcher { } abstract override def checkIsInRemoteCluster(trackId: TrackId): Boolean = { - start() + if (!K8sFlinkConfig.isV2Enabled) start() super.checkIsInRemoteCluster(trackId) } abstract override def postEvent(event: BuildInEvent, sync: Boolean): Unit = { - start() + if (!K8sFlinkConfig.isV2Enabled) start() super.postEvent(event, sync) } diff --git a/streampark-flink/streampark-flink-packer/pom.xml b/streampark-flink/streampark-flink-packer/pom.xml index a6e2a0716f..d5377465a7 100644 --- a/streampark-flink/streampark-flink-packer/pom.xml +++ b/streampark-flink/streampark-flink-packer/pom.xml @@ -48,6 +48,12 @@ ${project.version} + + org.apache.streampark + streampark-flink-kubernetes-core_${scala.binary.version} + ${project.version} + + org.apache.maven.plugins diff --git a/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineType.java b/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineType.java index 50c235e2d9..0c0e8e4b1e 100644 --- a/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineType.java +++ b/streampark-flink/streampark-flink-packer/src/main/java/org/apache/streampark/flink/packer/pipeline/PipelineType.java @@ -42,7 +42,7 @@ public enum PipelineType { /** flink native kubernetes application mode */ FLINK_NATIVE_K8S_APPLICATION( 2, - "flink native kubernetes session mode task building pipeline", + "flink native kubernetes application mode task building pipeline", ImmutableMap.builder() .put(1, "Create building workspace") .put(2, "Export kubernetes pod template") @@ -70,7 +70,16 @@ public enum PipelineType { .put(2, "Resolve maven dependencies") .put(3, "upload jar to yarn.provided.lib.dirs") .build(), - SimpleBuildResponse.class); + SimpleBuildResponse.class), + + FLINK_K8S_APPLICATION_V2( + 5, + "flink kubernetes application mode task building pipeline v2", + ImmutableMap.builder() + .put(1, "Create building workspace") + .put(2, "Build shaded flink app jar") + .build(), + K8sAppModeBuildResponse.class); private final Integer code; /** short description of pipeline type. */ diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildResponse.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildResponse.scala index e376d561fd..7efc2d95b4 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildResponse.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildResponse.scala @@ -66,3 +66,19 @@ case class DockerImageBuildResponse( s"dockerInnerMainJarPath: $dockerInnerMainJarPath, " + s"pass: $pass }" } + +@JsonIgnoreProperties(ignoreUnknown = true) +case class K8sAppModeBuildResponse( + workspacePath: String, + flinkBaseImage: String, + mainJarPath: String, + extraLibJarPaths: Set[String], + pass: Boolean = false +) extends FlinkBuildResult { + override def toString: String = + s"{ workspacePath: $workspacePath, " + + s"flinkBaseImage: $flinkBaseImage, " + + s"mainJarPath: $mainJarPath, " + + s"extraLibJarPaths: ${extraLibJarPaths.mkString(",")}, " + + s"pass: $pass }" +} diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala new file mode 100644 index 0000000000..6f59f6f02b --- /dev/null +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.packer.pipeline.impl + +import org.apache.streampark.common.enums.DevelopmentMode +import org.apache.streampark.common.fs.LfsOperator +import org.apache.streampark.flink.packer.maven.MavenTool +import org.apache.streampark.flink.packer.pipeline._ + +import scala.language.postfixOps + +/** + * Building pipeline V2(base on kubernetes operator) for flink kubernetes-native application mode + */ +class FlinkK8sApplicationBuildPipelineV2(request: FlinkK8sApplicationBuildRequest) + extends BuildPipeline { + + override def pipeType: PipelineType = PipelineType.FLINK_K8S_APPLICATION_V2 + + @throws[Throwable] + override protected def buildProcess(): K8sAppModeBuildResponse = { + + // Step-1: init build workspace of flink job + // the sub workspace dir like: APP_WORKSPACE/k8s-clusterId@k8s-namespace/ + val buildWorkspace = + execStep(1) { + val buildWorkspace = s"${request.workspace}/${request.clusterId}@${request.k8sNamespace}" + LfsOperator.mkCleanDirs(buildWorkspace) + logInfo(s"recreate building workspace: $buildWorkspace") + buildWorkspace + }.getOrElse(throw getError.exception) + + // Step-2: build shaded flink job jar and handle extra jars + // the output shaded jar file name like: streampark-flinkjob_myjob-test.jar + val (shadedJar, extJarLibs) = + execStep(2) { + val shadedJarOutputPath = request.getShadedJarPath(buildWorkspace) + val extJarLibs = request.developmentMode match { + case DevelopmentMode.FLINK_SQL => request.dependencyInfo.extJarLibs + case DevelopmentMode.CUSTOM_CODE => Set[String]() + } + val shadedJar = + MavenTool.buildFatJar(request.mainClass, request.providedLibs, shadedJarOutputPath) + logInfo(s"output shaded flink job jar: ${shadedJar.getAbsolutePath}") + shadedJar -> extJarLibs + }.getOrElse(throw getError.exception) + + K8sAppModeBuildResponse( + workspacePath = buildWorkspace, + flinkBaseImage = request.flinkBaseImage, + mainJarPath = shadedJar.getAbsolutePath, + extraLibJarPaths = extJarLibs) + } + + override protected def offerBuildParam: FlinkK8sApplicationBuildRequest = request +} + +object FlinkK8sApplicationBuildPipelineV2 { + def of(request: FlinkK8sApplicationBuildRequest): FlinkK8sApplicationBuildPipelineV2 = + new FlinkK8sApplicationBuildPipelineV2(request) + +}