Skip to content

Commit

Permalink
Move the evaluation process of the final status of flink k8s tasks to…
Browse files Browse the repository at this point in the history
… the flink-k8s-v2 module for better readability. #2881
  • Loading branch information
Al-assad committed Aug 1, 2023
1 parent 9edac37 commit 1c72947
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,15 @@
package org.apache.streampark.console.core.utils

import org.apache.streampark.console.core.enums.FlinkAppState
import org.apache.streampark.flink.kubernetes.v2.model.{EvalState, JobSnapshot, JobState}
import org.apache.streampark.flink.kubernetes.v2.model.JobState.JobState
import org.apache.streampark.flink.kubernetes.v2.model.EvalJobState.EvalJobState

import scala.util.Try

object FlinkAppStateConverter {

/** Merge CR status and job status inside [[JobSnapshot]] to [[FlinkAppState]]. */
def dryK8sJobSnapshotToFlinkAppState(snapshot: JobSnapshot): FlinkAppState =
(snapshot.crStatus, snapshot.jobStatus) match {
case (None, None) => FlinkAppState.LOST
case (None, Some(jobStatus)) => jobStateToAppState(jobStatus.state)
case (Some(crStatus), None) =>
crStatus.evalState match {
case EvalState.DEPLOYING | EvalState.READY => FlinkAppState.INITIALIZING
case EvalState.FAILED => FlinkAppState.FAILED
case EvalState.SUSPENDED => FlinkAppState.SUSPENDED
case EvalState.DELETED => FlinkAppState.TERMINATED
}
case (Some(crStatus), Some(jobStatus)) =>
if (jobStatus.updatedTs >= crStatus.updatedTs) jobStateToAppState(jobStatus.state)
else {
crStatus.evalState match {
case EvalState.FAILED => FlinkAppState.FAILED
case EvalState.SUSPENDED => FlinkAppState.SUSPENDED
case EvalState.DELETED => FlinkAppState.TERMINATED
case EvalState.READY => jobStateToAppState(jobStatus.state)
case EvalState.DEPLOYING =>
if (JobState.maybeDeploying.contains(jobStatus.state)) jobStateToAppState(jobStatus.state)
else FlinkAppState.INITIALIZING
}
}
}

private def jobStateToAppState(state: JobState) =
Try(FlinkAppState.of(state.toString)).getOrElse(FlinkAppState.OTHER)
/** Convert [[EvalJobState]] to [[FlinkAppState]]. */
def k8sEvalJobStateToFlinkAppState(jobState: EvalJobState): FlinkAppState = {
Try(FlinkAppState.valueOf(jobState.toString)).getOrElse(FlinkAppState.OTHER)
}

}
2 changes: 1 addition & 1 deletion streampark-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<module>streampark-flink-packer</module>
<module>streampark-flink-kubernetes</module>
<module>streampark-flink-kubernetes-v2</module>
<module>streampark-flink-sql-gateway</module>
<!-- <module>streampark-flink-sql-gateway</module>-->
</modules>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package org.apache.streampark.flink.kubernetes.v2
import org.apache.streampark.flink.kubernetes.v2.FlinkRestRequest._
import org.apache.streampark.flink.kubernetes.v2.model.{FlinkPipeOprState, JobSavepointDef, JobSavepointStatus}

import zio.{IO, Task, ZIO}
import zio.{IO, ZIO}
import zio.ZIO.attempt
import zio.http.{Body, Client, Method, Response}
import zio.http.{Body, Client}
import zio.http.Method.{PATCH, POST}
import zio.json._

import java.nio.charset.Charset
Expand All @@ -41,7 +42,7 @@ case class FlinkRestRequest(restUrl: String) {
*/
def listJobOverviewInfo: IO[Throwable, Vector[JobOverviewInfo]] =
for {
res <- get(s"$restUrl/jobs/overview")
res <- Client.request(s"$restUrl/jobs/overview")
rs <- res.body.asJson[JobOverviewRsp]
} yield rs.jobs

Expand All @@ -51,7 +52,7 @@ case class FlinkRestRequest(restUrl: String) {
*/
def getClusterOverview: IO[Throwable, ClusterOverviewInfo] =
for {
res <- get(s"$restUrl/overview")
res <- Client.request(s"$restUrl/overview")
rs <- res.body.asJson[ClusterOverviewInfo]
} yield rs

Expand All @@ -61,7 +62,7 @@ case class FlinkRestRequest(restUrl: String) {
*/
def getJobmanagerConfig: IO[Throwable, Map[String, String]] =
for {
res <- get(s"$restUrl/jobmanager/config")
res <- Client.request(s"$restUrl/jobmanager/config")
body <- res.body.asString
rs <- attempt {
ujson
Expand All @@ -77,7 +78,7 @@ case class FlinkRestRequest(restUrl: String) {
* see: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-1
*/
def cancelJob(jobId: String): IO[Throwable, Unit] = {
patch(s"$restUrl/jobs/$jobId?mode=cancel").unit
Client.request(s"$restUrl/jobs/$jobId?mode=cancel", method = PATCH).unit
}

/**
Expand All @@ -86,7 +87,7 @@ case class FlinkRestRequest(restUrl: String) {
*/
def stopJobWithSavepoint(jobId: String, sptReq: StopJobSptReq): IO[Throwable, TriggerId] =
for {
res <- post(s"$restUrl/jobs/$jobId/stop", sptReq.toJson)
res <- Client.request(s"$restUrl/jobs/$jobId/stop", method = POST, content = sptReq.toJson)
body <- res.body.asString
rs <- attempt(ujson.read(body)("request-id").str)
} yield rs
Expand All @@ -97,7 +98,7 @@ case class FlinkRestRequest(restUrl: String) {
*/
def triggerSavepoint(jobId: String, sptReq: TriggerSptReq): IO[Throwable, TriggerId] =
for {
res <- post(s"$restUrl/jobs/$jobId/savepoints", sptReq.toJson)
res <- Client.request(s"$restUrl/jobs/$jobId/savepoints", method = POST, content = sptReq.toJson)
body <- res.body.asString
rs <- attempt(ujson.read(body)("request-id").str)
} yield rs
Expand All @@ -108,7 +109,7 @@ case class FlinkRestRequest(restUrl: String) {
*/
def getSavepointOperationStatus(jobId: String, triggerId: String): IO[Throwable, JobSavepointStatus] =
for {
res <- get(s"$restUrl/jobs/$jobId/savepoints/$triggerId")
res <- Client.request(s"$restUrl/jobs/$jobId/savepoints/$triggerId")
body <- res.body.asString
rs <- attempt {
val rspJson = ujson.read(body)
Expand All @@ -130,16 +131,10 @@ case class FlinkRestRequest(restUrl: String) {

object FlinkRestRequest {

private def get(url: String): Task[Response] =
Client.request(url, method = Method.GET).provideLayer(Client.default)
implicit def autoProvideClientLayer[A](zio: ZIO[Client, Throwable, A]): IO[Throwable, A] =
zio.provideLayer(Client.default)

private def patch(url: String): Task[Response] =
Client.request(url, method = Method.PATCH).provideLayer(Client.default)

private def post(url: String, body: String): Task[Response] =
Client
.request(url, method = Method.POST, content = Body.fromString(body, charset = Charset.forName("UTF-8")))
.provideLayer(Client.default)
implicit def liftStringBody(content: String): Body = Body.fromString(content, charset = Charset.forName("UTF-8"))

implicit class BodyExtension(body: Body) {
def asJson[A](implicit decoder: JsonDecoder[A]): IO[Throwable, A] = for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,68 @@

package org.apache.streampark.flink.kubernetes.v2.model

import org.apache.streampark.flink.kubernetes.v2.model.EvalJobState.EvalJobState

/**
* Flink job status snapshot identified by StreamPark app-id.
*
* For the logical code to convert a JobSnapshot to a [[org.apache.streampark.console.core.enums.FlinkAppState]],
* see the [[org.apache.streampark.console.core.utils.FlinkAppStateConverter#dryK8sJobSnapshotToFlinkAppState]]
*
* @param appId Ref to [[org.apache.streampark.console.core.entity.Application.id]]
* @param appId Ref to [[org.apache.streampark.console.core.entity.Application.id]]
* @param clusterNs Flink cluster namespace on kubernetes.
* @param clusterId Flink cluster name on kubernetes.
* @param crStatus Flink K8s CR status.
* @param evalState Final evaluation job status
* @param crStatus Flink K8s CR status.
* @param jobStatus Flink job status received from REST API.
*/
case class JobSnapshot(
appId: Long,
clusterNs: String,
clusterId: String,
evalState: EvalJobState,
crStatus: Option[FlinkCRStatus],
jobStatus: Option[JobStatus])

object JobSnapshot {

def eval(
appId: Long,
clusterNs: String,
clusterId: String,
crStatus: Option[FlinkCRStatus],
jobStatus: Option[JobStatus]): JobSnapshot = JobSnapshot(
appId = appId,
clusterNs = clusterNs,
clusterId = clusterId,
evalState = evalFinalJobState(crStatus, jobStatus),
crStatus = crStatus,
jobStatus = jobStatus
)

private def evalFinalJobState(crStatus: Option[FlinkCRStatus], jobStatus: Option[JobStatus]): EvalJobState =
(crStatus, jobStatus) match {
case (None, None) => EvalJobState.LOST
case (None, Some(jobStatus)) => EvalJobState.of(jobStatus.state)
case (Some(crStatus), None) =>
crStatus.evalState match {
case EvalState.DEPLOYING | EvalState.READY => EvalJobState.INITIALIZING
case EvalState.FAILED => EvalJobState.FAILED
case EvalState.SUSPENDED => EvalJobState.SUSPENDED
case EvalState.DELETED => EvalJobState.TERMINATED
}
case (Some(crStatus), Some(jobStatus)) =>
if (jobStatus.updatedTs >= crStatus.updatedTs) EvalJobState.of(jobStatus.state)
else {
crStatus.evalState match {
case EvalState.FAILED => EvalJobState.FAILED
case EvalState.SUSPENDED => EvalJobState.SUSPENDED
case EvalState.DELETED => EvalJobState.TERMINATED
case EvalState.READY => EvalJobState.of(jobStatus.state)
case EvalState.DEPLOYING =>
if (JobState.maybeDeploying.contains(jobStatus.state)) EvalJobState.of(jobStatus.state)
else EvalJobState.INITIALIZING
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.flink.kubernetes.v2.model

import org.apache.streampark.flink.kubernetes.v2.model.JobState.JobState

/**
* Original Flink Job State.
* This enum is essentially equivalent to the meaning in the Flink REST API.
* see: [[org.apache.flink.kubernetes.operator.api.status.JobStatus]]
*/
object JobState extends Enumeration {

type JobState = Value
val INITIALIZING, CREATED, RUNNING, FAILING, FAILED, CANCELLING, CANCELED, FINISHED, RESTARTING, SUSPENDED,
RECONCILING = Value
val UNKNOWN = Value

def valueOf(raw: String): JobState = values.find(_.toString == raw).getOrElse(UNKNOWN)
val maybeDeploying = Set(INITIALIZING, CREATED, RESTARTING, RECONCILING)
}

/**
* Evaluated Job State.
* This state is the result of a combination of Flink CR and REST API evaluations,
* It can be converted directly to StreamPark [[org.apache.streampark.console.core.enums.FlinkAppState]]
*/
object EvalJobState extends Enumeration {

type EvalJobState = Value

val INITIALIZING, CREATED, RUNNING, FAILING, FAILED, CANCELLING, CANCELED, FINISHED, RESTARTING, SUSPENDED,
RECONCILING = Value

// copy from [[org.apache.streampark.console.core.enums.FlinkAppState]]
val LOST, TERMINATED, OTHER = Value

def of(state: JobState): EvalJobState = values.find(e => e.toString == state.toString).getOrElse(OTHER)

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,6 @@ case class JobStatus(
tasks: Option[TaskStats] = None,
updatedTs: Long)

/**
* Flink Job State.
* This enum is essentially equivalent to the meaning in the Flink REST API.
* see: [[org.apache.flink.kubernetes.operator.api.status.JobStatus]]
*/
object JobState extends Enumeration {

type JobState = Value

val INITIALIZING, CREATED, RUNNING, FAILING, FAILED, CANCELLING, CANCELED, FINISHED, RESTARTING, SUSPENDED,
RECONCILING = Value
val UNKNOWN = Value

def valueOf(raw: String): JobState = values.find(_.toString == raw).getOrElse(UNKNOWN)
val maybeDeploying = Set(INITIALIZING, CREATED, RESTARTING, RECONCILING)
}

object JobStatus {

// Convert from REST API object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,16 @@ sealed trait FlinkK8sObserver {

object FlinkK8sObserver extends FlinkK8sObserver {

val trackedKeys = ConcurrentSet.empty[TrackKey].runUIO

// fixme duplicate subscribe
// The following is a visible external snapshot.
val trackedKeys = ConcurrentSet.empty[TrackKey].runUIO
val evaluatedJobSnaps = Ref.make(Map.empty[AppId, JobSnapshot]).runUIO
val restSvcEndpointSnaps = ConcurrentMap.empty[(Namespace, Name), RestSvcEndpoint].runUIO
// fixme duplicate subscribe
val clusterMetricsSnaps = ConcurrentMap.empty[(Namespace, Name), ClusterMetrics].runUIO

val deployCRSnaps = ConcurrentMap.empty[(Namespace, Name), (DeployCRStatus, Option[JobStatus])].runUIO
val sessionJobCRSnaps = ConcurrentMap.empty[(Namespace, Name), (SessionJobCRStatus, Option[JobStatus])].runUIO
private[observer] val clusterJobStatusSnaps = ConcurrentMap.empty[(Namespace, Name), Vector[JobStatus]].runUIO
// In general, there is no need to view these snapshots externally.
val deployCRSnaps = ConcurrentMap.empty[(Namespace, Name), (DeployCRStatus, Option[JobStatus])].runUIO
val sessionJobCRSnaps = ConcurrentMap.empty[(Namespace, Name), (SessionJobCRStatus, Option[JobStatus])].runUIO
val clusterJobStatusSnaps = ConcurrentMap.empty[(Namespace, Name), Vector[JobStatus]].runUIO

private val restSvcEndpointObserver = RestSvcEndpointObserver(restSvcEndpointSnaps)
private val deployCrObserver = DeployCRObserver(deployCRSnaps)
Expand Down Expand Up @@ -198,16 +197,17 @@ object FlinkK8sObserver extends FlinkK8sObserver {
/** Re-evaluate all job status snapshots from caches. */
private def evalJobSnapshot: UIO[Unit] = {

def mergeJobStatus(crStatus: Option[JobStatus], restStatus: Option[JobStatus]) = (crStatus, restStatus) match {
case (Some(e), None) => Some(e)
case (None, Some(e)) => Some(e)
case (None, None) => None
case (Some(cr), Some(rest)) =>
Some(
if (rest.updatedTs > cr.updatedTs) rest
else cr.copy(endTs = rest.endTs, tasks = rest.tasks)
)
}
def mergeJobStatus(crStatus: Option[JobStatus], restStatus: Option[JobStatus]) =
(crStatus, restStatus) match {
case (Some(e), None) => Some(e)
case (None, Some(e)) => Some(e)
case (None, None) => None
case (Some(cr), Some(rest)) =>
Some(
if (rest.updatedTs > cr.updatedTs) rest
else cr.copy(endTs = rest.endTs, tasks = rest.tasks)
)
}

ZStream
.fromIterableZIO(trackedKeys.toSet)
Expand All @@ -227,7 +227,7 @@ object FlinkK8sObserver extends FlinkK8sObserver {
jobStatusFromRest = restJobStatusVec.flatMap(_.headOption)
finalJobStatus = mergeJobStatus(jobStatusFromCr, jobStatusFromRest)

} yield JobSnapshot(id, ns, name, crStatus, finalJobStatus)
} yield JobSnapshot.eval(id, ns, name, crStatus, finalJobStatus)

case SessionJobKey(id, ns, name, clusterName) =>
for {
Expand All @@ -240,13 +240,13 @@ object FlinkK8sObserver extends FlinkK8sObserver {
jobStatusFromRest = restJobStatusVec.flatMap(_.find(_.jobId == jobId))
finalJobStatus = mergeJobStatus(jobStatusFromCr, jobStatusFromRest)

} yield JobSnapshot(id, ns, clusterName, crStatus, finalJobStatus)
} yield JobSnapshot.eval(id, ns, clusterName, crStatus, finalJobStatus)

case UnmanagedSessionJobKey(id, clusterNs, clusterName, jid) =>
for {
restJobStatusVec <- clusterJobStatusSnaps.get((clusterNs, clusterName))
jobStatus = restJobStatusVec.flatMap(_.find(_.jobId == jid))
} yield JobSnapshot(id, clusterNs, clusterName, None, jobStatus)
} yield JobSnapshot.eval(id, clusterNs, clusterName, None, jobStatus)
}
// Collect result and Refresh evaluatedJobSnaps cache
.runCollect
Expand Down
Loading

0 comments on commit 1c72947

Please sign in to comment.