Skip to content

Commit

Permalink
[Feat][Flink-K8s-V2] Refactor the lifecycle control of Flink K8s appl…
Browse files Browse the repository at this point in the history
…ication-mode jobs (#3037)

* [Feat] Refactor Flink resource build pipeline for fink-k8s-v2 module. #2882

* [Feat] Adaptation of the submission client for flink-k8s-v2. #2882

* [Feat][flink-k8s-v2] Migrate ENABLE_V2 to streampark-common module. #2882

* [Feat][flink-k8s-v2] Disable Flink job tracking watcher at flink-k8s-v1. #2882

* [Feat][flink-k8s-v2] Adaptation of Flink job canceling and triggering savepoint operations . #2882

* [Feat][flink-k8s-v2] Untrack flink when it is deleted. #2882
  • Loading branch information
Al-assad authored Sep 10, 2023
1 parent 65d968f commit 11c7fa8
Show file tree
Hide file tree
Showing 27 changed files with 749 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,54 @@
package org.apache.streampark.common.conf

/** Flink kubernetes Configuration for v1 version */
@deprecated("see: org.apache.streampark.flink.kubernetes.v2.Config")

object K8sFlinkConfig {

lazy val isV2Enabled: Boolean = InternalConfigHolder.get(ENABLE_V2)

val ENABLE_V2: InternalOption = InternalOption(
key = "streampark.flink-k8s.enable-v2",
defaultValue = false,
classType = classOf[Boolean],
description =
"Whether to enable the v2 version(base on flink-kubernetes-operator) of flink kubernetes operation"
)

// ======= deprecated =======

@deprecated
val jobStatusTrackTaskTimeoutSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.polling-task-timeout-sec.job-status",
defaultValue = 120L,
classType = classOf[java.lang.Long],
description = "run timeout seconds of single flink-k8s metrics tracking task"
)

@deprecated
val metricTrackTaskTimeoutSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.polling-task-timeout-sec.cluster-metric",
defaultValue = 120L,
classType = classOf[java.lang.Long],
description = "run timeout seconds of single flink-k8s job status tracking task"
)

@deprecated
val jobStatueTrackTaskIntervalSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.polling-interval-sec.job-status",
defaultValue = 5L,
classType = classOf[java.lang.Long],
description = "interval seconds between two single flink-k8s metrics tracking task"
)

@deprecated
val metricTrackTaskIntervalSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.polling-interval-sec.cluster-metric",
defaultValue = 5L,
classType = classOf[java.lang.Long],
description = "interval seconds between two single flink-k8s metrics tracking task"
)

@deprecated
val silentStateJobKeepTrackingSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.silent-state-keep-sec",
defaultValue = 60,
Expand All @@ -69,6 +86,7 @@ object K8sFlinkConfig {
)

/** kubernetes default namespace */
@deprecated
val DEFAULT_KUBERNETES_NAMESPACE = "default"

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,42 @@

package org.apache.streampark.common.zio

import zio.{IO, Runtime, Unsafe, ZIO}
import zio.{FiberFailure, IO, Runtime, Unsafe, ZIO}
import zio.stream.ZStream

import scala.util.Try

/** ZIO extension */
object ZIOExt {

/* Unsafe run zio effect. */
@throws[Exception]
@throws[FiberFailure]
@inline def unsafeRun[E, A](zio: IO[E, A]): A = Unsafe.unsafe {
implicit u =>
Runtime.default.unsafe
.run(zio.provideLayer(Runtime.removeDefaultLoggers >>> ZIOLogger.default))
.getOrThrowFiberFailure()
}

/** unsafe run IO to Either. */
@inline def unsafeRunToEither[E, A](zio: IO[E, A]): Either[Throwable, A] = Unsafe.unsafe {
implicit u =>
Runtime.default.unsafe
.run(zio.provideLayer(Runtime.removeDefaultLoggers >>> ZIOLogger.default))
.toEither
}

implicit class IOOps[E, A](io: ZIO[Any, E, A]) {

/** unsafe run IO */
@throws[Throwable]
@throws[FiberFailure]
def runIO: A = ZIOExt.unsafeRun(io)

/** unsafe run IO to Try. */
def runIOAsTry: Try[A] = unsafeRunToEither(io).toTry

/** unsafe run IO to Either. */
def runIOAsEither: Either[Throwable, A] = unsafeRunToEither(io)
}

implicit class UIOOps[A](uio: ZIO[Any, Nothing, A]) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.common.zio

import zio.{FiberFailure, IO, UIO}

/** Util for running ZIO effects in Java. */
object ZIOJavaUtil {

@throws[FiberFailure]
def runIO[E, A](zio: IO[E, A]): A = ZIOExt.unsafeRun(zio)

def runUIO[A](uio: UIO[A]): A = ZIOExt.unsafeRun(uio)

}
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ public void cancel(Application appParam) throws Exception {

CancelRequest cancelRequest =
new CancelRequest(
application.getId(),
flinkEnv.getFlinkVersion(),
ExecutionMode.of(application.getExecutionMode()),
properties,
Expand Down Expand Up @@ -413,10 +414,13 @@ public void start(Application appParam, boolean auto) throws Exception {
extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
}

// TODO Need to display more K8s submission parameters in the front-end UI.
// See: org.apache.streampark.flink.client.bean.KubernetesSubmitParam
KubernetesSubmitParam kubernetesSubmitParam =
new KubernetesSubmitParam(
KubernetesSubmitParam.apply(
application.getClusterId(),
application.getK8sNamespace(),
application.getFlinkImage(),
application.getK8sRestExposedTypeEnum());

Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(flinkEnv, application);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.streampark.console.core.service.application.impl;

import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.zio.ZIOJavaUtil;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
Expand Down Expand Up @@ -54,6 +56,7 @@
import org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver;
import org.apache.streampark.flink.packer.pipeline.PipelineStatus;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -176,9 +179,11 @@ public Boolean delete(Application appParam) {

// 8) remove app
removeApp(application);

if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(application));
if (K8sFlinkConfig.isV2Enabled()) {
ZIOJavaUtil.runUIO(FlinkK8sObserver.untrackById(application.getId()));
}
} else {
FlinkHttpWatcher.unWatching(appParam.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.streampark.console.core.service.impl;

import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.DevelopmentMode;
Expand Down Expand Up @@ -78,6 +79,7 @@
import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
import org.apache.streampark.flink.packer.pipeline.PipelineType;
import org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipeline;
import org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipelineV2;
import org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sSessionBuildPipeline;
import org.apache.streampark.flink.packer.pipeline.impl.FlinkRemoteBuildPipeline;
import org.apache.streampark.flink.packer.pipeline.impl.FlinkYarnApplicationBuildPipeline;
Expand Down Expand Up @@ -511,7 +513,11 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
dockerConfig.getPassword()),
app.getIngressTemplate());
log.info("Submit params to building pipeline : {}", k8sApplicationBuildRequest);
return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
if (K8sFlinkConfig.isV2Enabled()) {
return FlinkK8sApplicationBuildPipelineV2.of(k8sApplicationBuildRequest);
} else {
return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
}
default:
throw new UnsupportedOperationException(
"Unsupported Building Application for ExecutionMode: " + app.getExecutionModeEnum());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ private TriggerSavepointRequest renderTriggerSavepointRequest(
Map<String, Object> properties = this.tryGetRestProps(application, cluster);

return new TriggerSavepointRequest(
application.getId(),
flinkEnv.getFlinkVersion(),
application.getExecutionModeEnum(),
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.console.core.task;

import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.service.application.ApplicationManageService;
Expand Down Expand Up @@ -82,11 +83,15 @@ public FlinkK8sWatcher registerFlinkK8sWatcher() {
}

private void initFlinkK8sWatcher(@Nonnull FlinkK8sWatcher trackMonitor) {
// register change event listener
trackMonitor.registerListener(flinkK8sChangeEventListener);
// recovery tracking list
List<TrackId> k8sApp = getK8sWatchingApps();
k8sApp.forEach(trackMonitor::doWatching);
if (!K8sFlinkConfig.isV2Enabled()) {
// register change event listener
trackMonitor.registerListener(flinkK8sChangeEventListener);
// recovery tracking list
List<TrackId> k8sApp = getK8sWatchingApps();
k8sApp.forEach(trackMonitor::doWatching);
} else {
// TODO [flink-k8s-v2] Recovery tracking list and invoke FlinkK8sObserver.track()
}
}

/** get flink-k8s job tracking application from db. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import javax.annotation.Nullable
import java.util.{Map => JavaMap}

case class CancelRequest(
id: Long,
flinkVersion: FlinkVersion,
executionMode: ExecutionMode,
@Nullable properties: JavaMap[String, Any],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.flink.client.bean

import org.apache.streampark.common.enums.FlinkK8sRestExposedType

import javax.annotation.Nullable

import java.util
import java.util.{Map => JMap}

/**
* TODO Need to display more K8s submission parameters in the front-end UI.
*
* It will eventually be converted to
* [[org.apache.streampark.flink.kubernetes.v2.model.FlinkDeploymentDef]]
*
* The logic of conversion is located at:
* [[org.apache.streampark.flink.client.impl.KubernetesApplicationClientV2#genFlinkDeployDef]]
*/
// todo split into Application mode and SessionJob mode
case class KubernetesSubmitParam(
clusterId: String,
kubernetesNamespace: String,
baseImage: Option[String] = None,
imagePullPolicy: Option[String] = None,
serviceAccount: Option[String] = None,
podTemplate: Option[String] = None,
jobManagerCpu: Option[Double] = None,
jobManagerMemory: Option[String] = None,
jobManagerEphemeralStorage: Option[String] = None,
jobManagerPodTemplate: Option[String] = None,
taskManagerCpu: Option[Double] = None,
taskManagerMemory: Option[String] = None,
taskManagerEphemeralStorage: Option[String] = None,
taskManagerPodTemplate: Option[String] = None,
logConfiguration: JMap[String, String] = new util.HashMap[String, String](),
flinkRestExposedType: Option[FlinkK8sRestExposedType] = None
)

object KubernetesSubmitParam {

/**
* Compatible with streampark old native k8s submission parameters.
*
* @param clusterId
* flink cluster id in k8s cluster.
* @param kubernetesNamespace
* k8s namespace.
* @param flinkRestExposedType
* flink rest-service exposed type on k8s cluster.
*/
def apply(
clusterId: String,
kubernetesNamespace: String,
baseImage: String,
@Nullable flinkRestExposedType: FlinkK8sRestExposedType): KubernetesSubmitParam =
KubernetesSubmitParam(
clusterId = clusterId,
kubernetesNamespace = kubernetesNamespace,
baseImage = Some(baseImage),
flinkRestExposedType = Option(flinkRestExposedType))
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,6 @@ import java.util.{Map => JavaMap}
import scala.collection.convert.ImplicitConversions._
import scala.util.Try

/**
* @param clusterId
* flink cluster id in k8s cluster.
* @param kubernetesNamespace
* k8s namespace.
* @param flinkRestExposedType
* flink rest-service exposed type on k8s cluster.
*/
case class KubernetesSubmitParam(
clusterId: String,
kubernetesNamespace: String,
@Nullable flinkRestExposedType: FlinkK8sRestExposedType)

case class SubmitRequest(
flinkVersion: FlinkVersion,
executionMode: ExecutionMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.util.{Map => JavaMap}

/** Trigger savepoint request. */
case class TriggerSavepointRequest(
id: Long,
flinkVersion: FlinkVersion,
executionMode: ExecutionMode,
@Nullable properties: JavaMap[String, Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.flink.client

import org.apache.streampark.common.conf.K8sFlinkConfig
import org.apache.streampark.common.enums.ExecutionMode
import org.apache.streampark.common.enums.ExecutionMode._
import org.apache.streampark.flink.client.`trait`.FlinkClientTrait
Expand All @@ -32,7 +33,10 @@ object FlinkClientHandler {
YARN_SESSION -> YarnSessionClient,
YARN_PER_JOB -> YarnPerJobClient,
KUBERNETES_NATIVE_SESSION -> KubernetesNativeSessionClient,
KUBERNETES_NATIVE_APPLICATION -> KubernetesNativeApplicationClient
KUBERNETES_NATIVE_APPLICATION -> {
if (K8sFlinkConfig.isV2Enabled) KubernetesApplicationClientV2
else KubernetesNativeApplicationClient
}
)

def submit(request: SubmitRequest): SubmitResponse = {
Expand Down
Loading

0 comments on commit 11c7fa8

Please sign in to comment.