From 1c72947bc41b973f7b312ffb016ca8087ef53eed Mon Sep 17 00:00:00 2001 From: Linying Assad Date: Wed, 2 Aug 2023 00:59:11 +0800 Subject: [PATCH] Move the evaluation process of the final status of flink k8s tasks to the flink-k8s-v2 module for better readability. #2881 --- .../core/utils/FlinkAppStateConverter.scala | 36 ++---------- streampark-flink/pom.xml | 2 +- .../kubernetes/v2/FlinkRestRequest.scala | 31 +++++------ .../kubernetes/v2/model/JobSnapshot.scala | 53 +++++++++++++++++- .../flink/kubernetes/v2/model/JobState.scala | 55 +++++++++++++++++++ .../flink/kubernetes/v2/model/JobStatus.scala | 17 ------ .../v2/observer/FlinkK8sObserver.scala | 40 +++++++------- .../v2/observer/RawClusterObserver.scala | 4 +- .../kubernetes/v2/example/UsingObserver.scala | 16 +++++- 9 files changed, 160 insertions(+), 94 deletions(-) create mode 100644 streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala diff --git a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkAppStateConverter.scala b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkAppStateConverter.scala index 57f6f42879..93c3cfc6f7 100644 --- a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkAppStateConverter.scala +++ b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkAppStateConverter.scala @@ -18,41 +18,15 @@ package org.apache.streampark.console.core.utils import org.apache.streampark.console.core.enums.FlinkAppState -import org.apache.streampark.flink.kubernetes.v2.model.{EvalState, JobSnapshot, JobState} -import org.apache.streampark.flink.kubernetes.v2.model.JobState.JobState +import org.apache.streampark.flink.kubernetes.v2.model.EvalJobState.EvalJobState import scala.util.Try object FlinkAppStateConverter { - /** Merge CR status and job status inside [[JobSnapshot]] to [[FlinkAppState]]. */ - def dryK8sJobSnapshotToFlinkAppState(snapshot: JobSnapshot): FlinkAppState = - (snapshot.crStatus, snapshot.jobStatus) match { - case (None, None) => FlinkAppState.LOST - case (None, Some(jobStatus)) => jobStateToAppState(jobStatus.state) - case (Some(crStatus), None) => - crStatus.evalState match { - case EvalState.DEPLOYING | EvalState.READY => FlinkAppState.INITIALIZING - case EvalState.FAILED => FlinkAppState.FAILED - case EvalState.SUSPENDED => FlinkAppState.SUSPENDED - case EvalState.DELETED => FlinkAppState.TERMINATED - } - case (Some(crStatus), Some(jobStatus)) => - if (jobStatus.updatedTs >= crStatus.updatedTs) jobStateToAppState(jobStatus.state) - else { - crStatus.evalState match { - case EvalState.FAILED => FlinkAppState.FAILED - case EvalState.SUSPENDED => FlinkAppState.SUSPENDED - case EvalState.DELETED => FlinkAppState.TERMINATED - case EvalState.READY => jobStateToAppState(jobStatus.state) - case EvalState.DEPLOYING => - if (JobState.maybeDeploying.contains(jobStatus.state)) jobStateToAppState(jobStatus.state) - else FlinkAppState.INITIALIZING - } - } - } - - private def jobStateToAppState(state: JobState) = - Try(FlinkAppState.of(state.toString)).getOrElse(FlinkAppState.OTHER) + /** Convert [[EvalJobState]] to [[FlinkAppState]]. */ + def k8sEvalJobStateToFlinkAppState(jobState: EvalJobState): FlinkAppState = { + Try(FlinkAppState.valueOf(jobState.toString)).getOrElse(FlinkAppState.OTHER) + } } diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml index 19184ab60e..0269efe24c 100644 --- a/streampark-flink/pom.xml +++ b/streampark-flink/pom.xml @@ -39,7 +39,7 @@ streampark-flink-packer streampark-flink-kubernetes streampark-flink-kubernetes-v2 - streampark-flink-sql-gateway + diff --git a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkRestRequest.scala b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkRestRequest.scala index 7914e0ae02..c96e88d274 100644 --- a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkRestRequest.scala +++ b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkRestRequest.scala @@ -20,9 +20,10 @@ package org.apache.streampark.flink.kubernetes.v2 import org.apache.streampark.flink.kubernetes.v2.FlinkRestRequest._ import org.apache.streampark.flink.kubernetes.v2.model.{FlinkPipeOprState, JobSavepointDef, JobSavepointStatus} -import zio.{IO, Task, ZIO} +import zio.{IO, ZIO} import zio.ZIO.attempt -import zio.http.{Body, Client, Method, Response} +import zio.http.{Body, Client} +import zio.http.Method.{PATCH, POST} import zio.json._ import java.nio.charset.Charset @@ -41,7 +42,7 @@ case class FlinkRestRequest(restUrl: String) { */ def listJobOverviewInfo: IO[Throwable, Vector[JobOverviewInfo]] = for { - res <- get(s"$restUrl/jobs/overview") + res <- Client.request(s"$restUrl/jobs/overview") rs <- res.body.asJson[JobOverviewRsp] } yield rs.jobs @@ -51,7 +52,7 @@ case class FlinkRestRequest(restUrl: String) { */ def getClusterOverview: IO[Throwable, ClusterOverviewInfo] = for { - res <- get(s"$restUrl/overview") + res <- Client.request(s"$restUrl/overview") rs <- res.body.asJson[ClusterOverviewInfo] } yield rs @@ -61,7 +62,7 @@ case class FlinkRestRequest(restUrl: String) { */ def getJobmanagerConfig: IO[Throwable, Map[String, String]] = for { - res <- get(s"$restUrl/jobmanager/config") + res <- Client.request(s"$restUrl/jobmanager/config") body <- res.body.asString rs <- attempt { ujson @@ -77,7 +78,7 @@ case class FlinkRestRequest(restUrl: String) { * see: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-1 */ def cancelJob(jobId: String): IO[Throwable, Unit] = { - patch(s"$restUrl/jobs/$jobId?mode=cancel").unit + Client.request(s"$restUrl/jobs/$jobId?mode=cancel", method = PATCH).unit } /** @@ -86,7 +87,7 @@ case class FlinkRestRequest(restUrl: String) { */ def stopJobWithSavepoint(jobId: String, sptReq: StopJobSptReq): IO[Throwable, TriggerId] = for { - res <- post(s"$restUrl/jobs/$jobId/stop", sptReq.toJson) + res <- Client.request(s"$restUrl/jobs/$jobId/stop", method = POST, content = sptReq.toJson) body <- res.body.asString rs <- attempt(ujson.read(body)("request-id").str) } yield rs @@ -97,7 +98,7 @@ case class FlinkRestRequest(restUrl: String) { */ def triggerSavepoint(jobId: String, sptReq: TriggerSptReq): IO[Throwable, TriggerId] = for { - res <- post(s"$restUrl/jobs/$jobId/savepoints", sptReq.toJson) + res <- Client.request(s"$restUrl/jobs/$jobId/savepoints", method = POST, content = sptReq.toJson) body <- res.body.asString rs <- attempt(ujson.read(body)("request-id").str) } yield rs @@ -108,7 +109,7 @@ case class FlinkRestRequest(restUrl: String) { */ def getSavepointOperationStatus(jobId: String, triggerId: String): IO[Throwable, JobSavepointStatus] = for { - res <- get(s"$restUrl/jobs/$jobId/savepoints/$triggerId") + res <- Client.request(s"$restUrl/jobs/$jobId/savepoints/$triggerId") body <- res.body.asString rs <- attempt { val rspJson = ujson.read(body) @@ -130,16 +131,10 @@ case class FlinkRestRequest(restUrl: String) { object FlinkRestRequest { - private def get(url: String): Task[Response] = - Client.request(url, method = Method.GET).provideLayer(Client.default) + implicit def autoProvideClientLayer[A](zio: ZIO[Client, Throwable, A]): IO[Throwable, A] = + zio.provideLayer(Client.default) - private def patch(url: String): Task[Response] = - Client.request(url, method = Method.PATCH).provideLayer(Client.default) - - private def post(url: String, body: String): Task[Response] = - Client - .request(url, method = Method.POST, content = Body.fromString(body, charset = Charset.forName("UTF-8"))) - .provideLayer(Client.default) + implicit def liftStringBody(content: String): Body = Body.fromString(content, charset = Charset.forName("UTF-8")) implicit class BodyExtension(body: Body) { def asJson[A](implicit decoder: JsonDecoder[A]): IO[Throwable, A] = for { diff --git a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobSnapshot.scala b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobSnapshot.scala index 87aa5a067e..0578141267 100644 --- a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobSnapshot.scala +++ b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobSnapshot.scala @@ -17,21 +17,68 @@ package org.apache.streampark.flink.kubernetes.v2.model +import org.apache.streampark.flink.kubernetes.v2.model.EvalJobState.EvalJobState + /** * Flink job status snapshot identified by StreamPark app-id. * * For the logical code to convert a JobSnapshot to a [[org.apache.streampark.console.core.enums.FlinkAppState]], - * see the [[org.apache.streampark.console.core.utils.FlinkAppStateConverter#dryK8sJobSnapshotToFlinkAppState]] * - * @param appId Ref to [[org.apache.streampark.console.core.entity.Application.id]] + * @param appId Ref to [[org.apache.streampark.console.core.entity.Application.id]] * @param clusterNs Flink cluster namespace on kubernetes. * @param clusterId Flink cluster name on kubernetes. - * @param crStatus Flink K8s CR status. + * @param evalState Final evaluation job status + * @param crStatus Flink K8s CR status. * @param jobStatus Flink job status received from REST API. */ case class JobSnapshot( appId: Long, clusterNs: String, clusterId: String, + evalState: EvalJobState, crStatus: Option[FlinkCRStatus], jobStatus: Option[JobStatus]) + +object JobSnapshot { + + def eval( + appId: Long, + clusterNs: String, + clusterId: String, + crStatus: Option[FlinkCRStatus], + jobStatus: Option[JobStatus]): JobSnapshot = JobSnapshot( + appId = appId, + clusterNs = clusterNs, + clusterId = clusterId, + evalState = evalFinalJobState(crStatus, jobStatus), + crStatus = crStatus, + jobStatus = jobStatus + ) + + private def evalFinalJobState(crStatus: Option[FlinkCRStatus], jobStatus: Option[JobStatus]): EvalJobState = + (crStatus, jobStatus) match { + case (None, None) => EvalJobState.LOST + case (None, Some(jobStatus)) => EvalJobState.of(jobStatus.state) + case (Some(crStatus), None) => + crStatus.evalState match { + case EvalState.DEPLOYING | EvalState.READY => EvalJobState.INITIALIZING + case EvalState.FAILED => EvalJobState.FAILED + case EvalState.SUSPENDED => EvalJobState.SUSPENDED + case EvalState.DELETED => EvalJobState.TERMINATED + } + case (Some(crStatus), Some(jobStatus)) => + if (jobStatus.updatedTs >= crStatus.updatedTs) EvalJobState.of(jobStatus.state) + else { + crStatus.evalState match { + case EvalState.FAILED => EvalJobState.FAILED + case EvalState.SUSPENDED => EvalJobState.SUSPENDED + case EvalState.DELETED => EvalJobState.TERMINATED + case EvalState.READY => EvalJobState.of(jobStatus.state) + case EvalState.DEPLOYING => + if (JobState.maybeDeploying.contains(jobStatus.state)) EvalJobState.of(jobStatus.state) + else EvalJobState.INITIALIZING + } + } + } + +} diff --git a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala new file mode 100644 index 0000000000..9463cf0933 --- /dev/null +++ b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobState.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.kubernetes.v2.model + +import org.apache.streampark.flink.kubernetes.v2.model.JobState.JobState + +/** + * Original Flink Job State. + * This enum is essentially equivalent to the meaning in the Flink REST API. + * see: [[org.apache.flink.kubernetes.operator.api.status.JobStatus]] + */ +object JobState extends Enumeration { + + type JobState = Value + val INITIALIZING, CREATED, RUNNING, FAILING, FAILED, CANCELLING, CANCELED, FINISHED, RESTARTING, SUSPENDED, + RECONCILING = Value + val UNKNOWN = Value + + def valueOf(raw: String): JobState = values.find(_.toString == raw).getOrElse(UNKNOWN) + val maybeDeploying = Set(INITIALIZING, CREATED, RESTARTING, RECONCILING) +} + +/** + * Evaluated Job State. + * This state is the result of a combination of Flink CR and REST API evaluations, + * It can be converted directly to StreamPark [[org.apache.streampark.console.core.enums.FlinkAppState]] + */ +object EvalJobState extends Enumeration { + + type EvalJobState = Value + + val INITIALIZING, CREATED, RUNNING, FAILING, FAILED, CANCELLING, CANCELED, FINISHED, RESTARTING, SUSPENDED, + RECONCILING = Value + + // copy from [[org.apache.streampark.console.core.enums.FlinkAppState]] + val LOST, TERMINATED, OTHER = Value + + def of(state: JobState): EvalJobState = values.find(e => e.toString == state.toString).getOrElse(OTHER) + +} diff --git a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobStatus.scala b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobStatus.scala index b8c8a5d90c..54e5b04e37 100644 --- a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobStatus.scala +++ b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/model/JobStatus.scala @@ -45,23 +45,6 @@ case class JobStatus( tasks: Option[TaskStats] = None, updatedTs: Long) -/** - * Flink Job State. - * This enum is essentially equivalent to the meaning in the Flink REST API. - * see: [[org.apache.flink.kubernetes.operator.api.status.JobStatus]] - */ -object JobState extends Enumeration { - - type JobState = Value - - val INITIALIZING, CREATED, RUNNING, FAILING, FAILED, CANCELLING, CANCELED, FINISHED, RESTARTING, SUSPENDED, - RECONCILING = Value - val UNKNOWN = Value - - def valueOf(raw: String): JobState = values.find(_.toString == raw).getOrElse(UNKNOWN) - val maybeDeploying = Set(INITIALIZING, CREATED, RESTARTING, RECONCILING) -} - object JobStatus { // Convert from REST API object. diff --git a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala index f310b0eb93..a355be3d38 100644 --- a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala +++ b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala @@ -90,17 +90,16 @@ sealed trait FlinkK8sObserver { object FlinkK8sObserver extends FlinkK8sObserver { - val trackedKeys = ConcurrentSet.empty[TrackKey].runUIO - - // fixme duplicate subscribe + // The following is a visible external snapshot. + val trackedKeys = ConcurrentSet.empty[TrackKey].runUIO val evaluatedJobSnaps = Ref.make(Map.empty[AppId, JobSnapshot]).runUIO val restSvcEndpointSnaps = ConcurrentMap.empty[(Namespace, Name), RestSvcEndpoint].runUIO - // fixme duplicate subscribe val clusterMetricsSnaps = ConcurrentMap.empty[(Namespace, Name), ClusterMetrics].runUIO - val deployCRSnaps = ConcurrentMap.empty[(Namespace, Name), (DeployCRStatus, Option[JobStatus])].runUIO - val sessionJobCRSnaps = ConcurrentMap.empty[(Namespace, Name), (SessionJobCRStatus, Option[JobStatus])].runUIO - private[observer] val clusterJobStatusSnaps = ConcurrentMap.empty[(Namespace, Name), Vector[JobStatus]].runUIO + // In general, there is no need to view these snapshots externally. + val deployCRSnaps = ConcurrentMap.empty[(Namespace, Name), (DeployCRStatus, Option[JobStatus])].runUIO + val sessionJobCRSnaps = ConcurrentMap.empty[(Namespace, Name), (SessionJobCRStatus, Option[JobStatus])].runUIO + val clusterJobStatusSnaps = ConcurrentMap.empty[(Namespace, Name), Vector[JobStatus]].runUIO private val restSvcEndpointObserver = RestSvcEndpointObserver(restSvcEndpointSnaps) private val deployCrObserver = DeployCRObserver(deployCRSnaps) @@ -198,16 +197,17 @@ object FlinkK8sObserver extends FlinkK8sObserver { /** Re-evaluate all job status snapshots from caches. */ private def evalJobSnapshot: UIO[Unit] = { - def mergeJobStatus(crStatus: Option[JobStatus], restStatus: Option[JobStatus]) = (crStatus, restStatus) match { - case (Some(e), None) => Some(e) - case (None, Some(e)) => Some(e) - case (None, None) => None - case (Some(cr), Some(rest)) => - Some( - if (rest.updatedTs > cr.updatedTs) rest - else cr.copy(endTs = rest.endTs, tasks = rest.tasks) - ) - } + def mergeJobStatus(crStatus: Option[JobStatus], restStatus: Option[JobStatus]) = + (crStatus, restStatus) match { + case (Some(e), None) => Some(e) + case (None, Some(e)) => Some(e) + case (None, None) => None + case (Some(cr), Some(rest)) => + Some( + if (rest.updatedTs > cr.updatedTs) rest + else cr.copy(endTs = rest.endTs, tasks = rest.tasks) + ) + } ZStream .fromIterableZIO(trackedKeys.toSet) @@ -227,7 +227,7 @@ object FlinkK8sObserver extends FlinkK8sObserver { jobStatusFromRest = restJobStatusVec.flatMap(_.headOption) finalJobStatus = mergeJobStatus(jobStatusFromCr, jobStatusFromRest) - } yield JobSnapshot(id, ns, name, crStatus, finalJobStatus) + } yield JobSnapshot.eval(id, ns, name, crStatus, finalJobStatus) case SessionJobKey(id, ns, name, clusterName) => for { @@ -240,13 +240,13 @@ object FlinkK8sObserver extends FlinkK8sObserver { jobStatusFromRest = restJobStatusVec.flatMap(_.find(_.jobId == jobId)) finalJobStatus = mergeJobStatus(jobStatusFromCr, jobStatusFromRest) - } yield JobSnapshot(id, ns, clusterName, crStatus, finalJobStatus) + } yield JobSnapshot.eval(id, ns, clusterName, crStatus, finalJobStatus) case UnmanagedSessionJobKey(id, clusterNs, clusterName, jid) => for { restJobStatusVec <- clusterJobStatusSnaps.get((clusterNs, clusterName)) jobStatus = restJobStatusVec.flatMap(_.find(_.jobId == jid)) - } yield JobSnapshot(id, clusterNs, clusterName, None, jobStatus) + } yield JobSnapshot.eval(id, clusterNs, clusterName, None, jobStatus) } // Collect result and Refresh evaluatedJobSnaps cache .runCollect diff --git a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala index 5ebd7ee9f5..d4a7e12b78 100644 --- a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala +++ b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala @@ -60,9 +60,9 @@ case class RawClusterObserver( case Some(endpoint) => FlinkRestRequest(endpoint.chooseRest).listJobOverviewInfo }) .retry(Schedule.spaced(restRetryInterval)) - .repeat(Schedule.spaced(restPollingInterval)) .map(jobOverviews => jobOverviews.map(info => JobStatus.fromRest(info))) .tap(jobStatuses => clusterJobStatusSnaps.put((namespace, name), jobStatuses)) + .repeat(Schedule.spaced(restPollingInterval)) .runDrain .forkDaemon @@ -95,7 +95,6 @@ case class RawClusterObserver( FlinkRestRequest(endpoint.chooseRest).getJobmanagerConfig }) .retry(Schedule.spaced(restRetryInterval)) - .repeat(Schedule.spaced(restPollingInterval)) .map { case (clusterOv, jmConfigs) => val totalJmMemory = FlinkMemorySizeParser .parse(jmConfigs.getOrElse("jobmanager.memory.process.size", "0b")) @@ -121,6 +120,7 @@ case class RawClusterObserver( ) } .tap(metrics => clusterMetricsSnaps.put((namespace, name), metrics)) + .repeat(Schedule.spaced(restPollingInterval)) .runDrain .forkDaemon diff --git a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala index 20374cd4cf..c65f8c1fdc 100644 --- a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala +++ b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala @@ -61,18 +61,30 @@ class UsingObserver extends AnyWordSpecLike with BeforeAndAfterAll { _ <- FlinkK8sObserver.track(TrackKey.sessionJob(233, "fdev", "simple-sessionjob", "simple-session")) _ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev", "simple-appjob")) // subscribe job status changes - watchStream = FlinkK8sObserver.clusterMetricsSnaps.subscribe() + watchStream = FlinkK8sObserver.evaluatedJobSnaps.flatSubscribe() _ <- watchStream.debugPretty.runDrain } yield () } + "Only subscribe Flink job state changes." in unsafeRun { + for { + _ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev", "simple-appjob")) + _ <- FlinkK8sObserver.evaluatedJobSnaps + .flatSubscribe() + .map { case (appId, status) => (appId, status.evalState) } + .diffPrev + .debug + .runDrain + } yield () + } + "Subscribe Flink cluster metrics changes." in unsafeRun { for { // track resource _ <- FlinkK8sObserver.track(TrackKey.sessionJob(233, "fdev", "simple-sessionjob", "simple-session")) _ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev", "simple-appjob")) // subscribe job status changes - watchStream = FlinkK8sObserver.clusterMetricsSnaps.flatSubscribeValues() + watchStream = FlinkK8sObserver.clusterMetricsSnaps.flatSubscribe() _ <- watchStream.debugPretty.runDrain } yield () }