Skip to content

Commit

Permalink
Added flink k8s operator validate (#2966)
Browse files Browse the repository at this point in the history
* Added flink k8s operator validate

* Code optimization

* Modify flink k8s operator validate

* Modified CROperator
  • Loading branch information
ChengJie1053 authored Sep 5, 2023
1 parent 4eeeb6f commit 23b191f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,6 @@ case class DeployCRObserver(deployCRSnaps: ConcurrentMap[(Namespace, Name), (Dep
watchers.remove((namespace, name)).unit
}

// private def existCr(namespace: String, name: String): IO[Throwable, Boolean] =
// usingK8sClient { client =>
// client
// .resources(classOf[FlinkDeployment])
// .inNamespace(namespace)
// .withName(name)
// .get != null
// }

private def launchProc(namespace: String, name: String): K8sResourceWatcher[FlinkDeployment] =
watchK8sResourceForever(client =>
client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, Flin
import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver

import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList
import io.fabric8.kubernetes.client.CustomResource
import org.apache.flink.v1beta1.{FlinkDeployment, FlinkSessionJob}
import zio.{IO, UIO, ZIO}
import zio.stream.ZStream
Expand Down Expand Up @@ -61,6 +63,7 @@ object CROperator extends CROperator {
lazy val mirrorSpace = s"${spec.namespace}_${spec.name}"
for {
// Generate FlinkDeployment CR
_ <- validateDeployFlinkDeploymentCRD
correctedJob <- mirrorJobJarToHttpFileServer(spec.job, mirrorSpace)
correctedExtJars <- mirrorExtJarsToHttpFileServer(spec.extJarPaths, mirrorSpace)
correctedPod <- correctPodSpec(
Expand Down Expand Up @@ -249,4 +252,16 @@ object CROperator extends CROperator {
.delete()
} *> ZIO.logInfo(s"Delete FlinkDeployment CR: namespace=$namespace, name=$name")

/** Check whether FlinkDeployment CRD is deployed in the k8s cluster. */
def validateDeployFlinkDeploymentCRD = {
usingK8sClient { client =>
val crds: CustomResourceDefinitionList = client.apiextensions.v1.customResourceDefinitions.list
val flinkDeploymentCRDName: String = CustomResource.getCRDName(classOf[FlinkDeployment])

val exists: Boolean = crds.getItems.asScala.exists(crd => crd.getMetadata.getName == flinkDeploymentCRDName)

if (!exists)
throw new RuntimeException("The FlinkDeployment CRD is not currently deployed in the k8s cluster")
}
}
}

0 comments on commit 23b191f

Please sign in to comment.