Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat][Flink-K8s-V2] Refactor the lifecycle control of Flink K8s application-mode jobs #3037

Merged
merged 6 commits into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,54 @@
package org.apache.streampark.common.conf

/** Flink kubernetes Configuration for v1 version */
@deprecated("see: org.apache.streampark.flink.kubernetes.v2.Config")

object K8sFlinkConfig {

lazy val isV2Enabled: Boolean = InternalConfigHolder.get(ENABLE_V2)

val ENABLE_V2: InternalOption = InternalOption(
key = "streampark.flink-k8s.enable-v2",
defaultValue = false,
classType = classOf[Boolean],
description =
"Whether to enable the v2 version(base on flink-kubernetes-operator) of flink kubernetes operation"
)

// ======= deprecated =======

@deprecated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamPark consists of two parts: a development framework and a stream processing platform. The development framework part provides some APIs to users. The parameter InternalOption was originally designed for internal use within the platform, It is a non-public API. Therefore, these parameters can be deleted directly without causing any other impact on users

val jobStatusTrackTaskTimeoutSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.polling-task-timeout-sec.job-status",
defaultValue = 120L,
classType = classOf[java.lang.Long],
description = "run timeout seconds of single flink-k8s metrics tracking task"
)

@deprecated
val metricTrackTaskTimeoutSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.polling-task-timeout-sec.cluster-metric",
defaultValue = 120L,
classType = classOf[java.lang.Long],
description = "run timeout seconds of single flink-k8s job status tracking task"
)

@deprecated
val jobStatueTrackTaskIntervalSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.polling-interval-sec.job-status",
defaultValue = 5L,
classType = classOf[java.lang.Long],
description = "interval seconds between two single flink-k8s metrics tracking task"
)

@deprecated
val metricTrackTaskIntervalSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.polling-interval-sec.cluster-metric",
defaultValue = 5L,
classType = classOf[java.lang.Long],
description = "interval seconds between two single flink-k8s metrics tracking task"
)

@deprecated
val silentStateJobKeepTrackingSec: InternalOption = InternalOption(
key = "streampark.flink-k8s.tracking.silent-state-keep-sec",
defaultValue = 60,
Expand All @@ -69,6 +86,7 @@ object K8sFlinkConfig {
)

/** kubernetes default namespace */
@deprecated
val DEFAULT_KUBERNETES_NAMESPACE = "default"

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,42 @@

package org.apache.streampark.common.zio

import zio.{IO, Runtime, Unsafe, ZIO}
import zio.{FiberFailure, IO, Runtime, Unsafe, ZIO}
import zio.stream.ZStream

import scala.util.Try

/** ZIO extension */
object ZIOExt {

/* Unsafe run zio effect. */
@throws[Exception]
@throws[FiberFailure]
@inline def unsafeRun[E, A](zio: IO[E, A]): A = Unsafe.unsafe {
implicit u =>
Runtime.default.unsafe
.run(zio.provideLayer(Runtime.removeDefaultLoggers >>> ZIOLogger.default))
.getOrThrowFiberFailure()
}

/** unsafe run IO to Either. */
@inline def unsafeRunToEither[E, A](zio: IO[E, A]): Either[Throwable, A] = Unsafe.unsafe {
implicit u =>
Runtime.default.unsafe
.run(zio.provideLayer(Runtime.removeDefaultLoggers >>> ZIOLogger.default))
.toEither
}

implicit class IOOps[E, A](io: ZIO[Any, E, A]) {

/** unsafe run IO */
@throws[Throwable]
@throws[FiberFailure]
def runIO: A = ZIOExt.unsafeRun(io)

/** unsafe run IO to Try. */
def runIOAsTry: Try[A] = unsafeRunToEither(io).toTry

/** unsafe run IO to Either. */
def runIOAsEither: Either[Throwable, A] = unsafeRunToEither(io)
}

implicit class UIOOps[A](uio: ZIO[Any, Nothing, A]) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.common.zio

import zio.{FiberFailure, IO, UIO}

/** Util for running ZIO effects in Java. */
object ZIOJavaUtil {

@throws[FiberFailure]
def runIO[E, A](zio: IO[E, A]): A = ZIOExt.unsafeRun(zio)

def runUIO[A](uio: UIO[A]): A = ZIOExt.unsafeRun(uio)

}
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ public void cancel(Application appParam) throws Exception {

CancelRequest cancelRequest =
new CancelRequest(
application.getId(),
flinkEnv.getFlinkVersion(),
ExecutionMode.of(application.getExecutionMode()),
properties,
Expand Down Expand Up @@ -413,10 +414,13 @@ public void start(Application appParam, boolean auto) throws Exception {
extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
}

// TODO Need to display more K8s submission parameters in the front-end UI.
// See: org.apache.streampark.flink.client.bean.KubernetesSubmitParam
KubernetesSubmitParam kubernetesSubmitParam =
new KubernetesSubmitParam(
KubernetesSubmitParam.apply(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good job

application.getClusterId(),
application.getK8sNamespace(),
application.getFlinkImage(),
application.getK8sRestExposedTypeEnum());

Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(flinkEnv, application);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.streampark.console.core.service.application.impl;

import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.zio.ZIOJavaUtil;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
Expand Down Expand Up @@ -54,6 +56,7 @@
import org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver;
import org.apache.streampark.flink.packer.pipeline.PipelineStatus;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -176,9 +179,11 @@ public Boolean delete(Application appParam) {

// 8) remove app
removeApp(application);

if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(application));
if (K8sFlinkConfig.isV2Enabled()) {
ZIOJavaUtil.runUIO(FlinkK8sObserver.untrackById(application.getId()));
}
} else {
FlinkHttpWatcher.unWatching(appParam.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.streampark.console.core.service.impl;

import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.DevelopmentMode;
Expand Down Expand Up @@ -78,6 +79,7 @@
import org.apache.streampark.flink.packer.pipeline.PipelineStatus;
import org.apache.streampark.flink.packer.pipeline.PipelineType;
import org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipeline;
import org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipelineV2;
import org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sSessionBuildPipeline;
import org.apache.streampark.flink.packer.pipeline.impl.FlinkRemoteBuildPipeline;
import org.apache.streampark.flink.packer.pipeline.impl.FlinkYarnApplicationBuildPipeline;
Expand Down Expand Up @@ -511,7 +513,11 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
dockerConfig.getPassword()),
app.getIngressTemplate());
log.info("Submit params to building pipeline : {}", k8sApplicationBuildRequest);
return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
if (K8sFlinkConfig.isV2Enabled()) {
return FlinkK8sApplicationBuildPipelineV2.of(k8sApplicationBuildRequest);
} else {
return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
}
default:
throw new UnsupportedOperationException(
"Unsupported Building Application for ExecutionMode: " + app.getExecutionModeEnum());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ private TriggerSavepointRequest renderTriggerSavepointRequest(
Map<String, Object> properties = this.tryGetRestProps(application, cluster);

return new TriggerSavepointRequest(
application.getId(),
flinkEnv.getFlinkVersion(),
application.getExecutionModeEnum(),
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.console.core.task;

import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.service.application.ApplicationManageService;
Expand Down Expand Up @@ -82,11 +83,15 @@ public FlinkK8sWatcher registerFlinkK8sWatcher() {
}

private void initFlinkK8sWatcher(@Nonnull FlinkK8sWatcher trackMonitor) {
// register change event listener
trackMonitor.registerListener(flinkK8sChangeEventListener);
// recovery tracking list
List<TrackId> k8sApp = getK8sWatchingApps();
k8sApp.forEach(trackMonitor::doWatching);
if (!K8sFlinkConfig.isV2Enabled()) {
// register change event listener
trackMonitor.registerListener(flinkK8sChangeEventListener);
// recovery tracking list
List<TrackId> k8sApp = getK8sWatchingApps();
k8sApp.forEach(trackMonitor::doWatching);
} else {
// TODO [flink-k8s-v2] Recovery tracking list and invoke FlinkK8sObserver.track()
}
}

/** get flink-k8s job tracking application from db. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import javax.annotation.Nullable
import java.util.{Map => JavaMap}

case class CancelRequest(
id: Long,
flinkVersion: FlinkVersion,
executionMode: ExecutionMode,
@Nullable properties: JavaMap[String, Any],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.flink.client.bean

import org.apache.streampark.common.enums.FlinkK8sRestExposedType

import javax.annotation.Nullable

import java.util
import java.util.{Map => JMap}

/**
* TODO Need to display more K8s submission parameters in the front-end UI.
*
* It will eventually be converted to
* [[org.apache.streampark.flink.kubernetes.v2.model.FlinkDeploymentDef]]
*
* The logic of conversion is located at:
* [[org.apache.streampark.flink.client.impl.KubernetesApplicationClientV2#genFlinkDeployDef]]
*/
// todo split into Application mode and SessionJob mode
case class KubernetesSubmitParam(
clusterId: String,
kubernetesNamespace: String,
baseImage: Option[String] = None,
imagePullPolicy: Option[String] = None,
serviceAccount: Option[String] = None,
podTemplate: Option[String] = None,
jobManagerCpu: Option[Double] = None,
jobManagerMemory: Option[String] = None,
jobManagerEphemeralStorage: Option[String] = None,
jobManagerPodTemplate: Option[String] = None,
taskManagerCpu: Option[Double] = None,
taskManagerMemory: Option[String] = None,
taskManagerEphemeralStorage: Option[String] = None,
taskManagerPodTemplate: Option[String] = None,
logConfiguration: JMap[String, String] = new util.HashMap[String, String](),
flinkRestExposedType: Option[FlinkK8sRestExposedType] = None
)

object KubernetesSubmitParam {

/**
* Compatible with streampark old native k8s submission parameters.
*
* @param clusterId
* flink cluster id in k8s cluster.
* @param kubernetesNamespace
* k8s namespace.
* @param flinkRestExposedType
* flink rest-service exposed type on k8s cluster.
*/
def apply(
clusterId: String,
kubernetesNamespace: String,
baseImage: String,
@Nullable flinkRestExposedType: FlinkK8sRestExposedType): KubernetesSubmitParam =
KubernetesSubmitParam(
clusterId = clusterId,
kubernetesNamespace = kubernetesNamespace,
baseImage = Some(baseImage),
flinkRestExposedType = Option(flinkRestExposedType))
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,6 @@ import java.util.{Map => JavaMap}
import scala.collection.convert.ImplicitConversions._
import scala.util.Try

/**
* @param clusterId
* flink cluster id in k8s cluster.
* @param kubernetesNamespace
* k8s namespace.
* @param flinkRestExposedType
* flink rest-service exposed type on k8s cluster.
*/
case class KubernetesSubmitParam(
clusterId: String,
kubernetesNamespace: String,
@Nullable flinkRestExposedType: FlinkK8sRestExposedType)

case class SubmitRequest(
flinkVersion: FlinkVersion,
executionMode: ExecutionMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.util.{Map => JavaMap}

/** Trigger savepoint request. */
case class TriggerSavepointRequest(
id: Long,
flinkVersion: FlinkVersion,
executionMode: ExecutionMode,
@Nullable properties: JavaMap[String, Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.flink.client

import org.apache.streampark.common.conf.K8sFlinkConfig
import org.apache.streampark.common.enums.ExecutionMode
import org.apache.streampark.common.enums.ExecutionMode._
import org.apache.streampark.flink.client.`trait`.FlinkClientTrait
Expand All @@ -32,7 +33,10 @@ object FlinkClientHandler {
YARN_SESSION -> YarnSessionClient,
YARN_PER_JOB -> YarnPerJobClient,
KUBERNETES_NATIVE_SESSION -> KubernetesNativeSessionClient,
KUBERNETES_NATIVE_APPLICATION -> KubernetesNativeApplicationClient
KUBERNETES_NATIVE_APPLICATION -> {
if (K8sFlinkConfig.isV2Enabled) KubernetesApplicationClientV2
else KubernetesNativeApplicationClient
}
)

def submit(request: SubmitRequest): SubmitResponse = {
Expand Down
Loading
Loading