Skip to content

Commit

Permalink
Replace sttp client to zio-http client due to Java 8 incompatibility #…
Browse files Browse the repository at this point in the history
  • Loading branch information
Al-assad committed Aug 1, 2023
1 parent 4c8c3df commit 9edac37
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 109 deletions.
2 changes: 0 additions & 2 deletions dist-material/release-docs/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,6 @@ The text of each license is the standard Apache 2.0 license. https://www.apache.
https://mvnrepository.com/artifact/dev.zio/zio-streams_2.12/2.0.15 Apache-2.0
https://mvnrepository.com/artifact/dev.zio/zio-concurrent_2.12/2.0.15 Apache-2.0
https://mvnrepository.com/artifact/dev.zio/zio-http_2.12/3.0.0-RC2 Apache-2.0
https://mvnrepository.com/artifact/com.softwaremill.sttp.client3/zio_2.12/3.8.16 Apache-2.0
https://mvnrepository.com/artifact/com.softwaremill.sttp.client3/zio-json_2.12/3.8.16 Apache-2.0
https://maven.apache.org/wrapper Apache-2.0
mvnw files from https://github.com/apache/maven-wrapper Apache 2.0
streampark-console/streampark-console-service/src/main/assembly/bin/setclasspath.sh from https://github.com/apache/tomcat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
<properties>
<zio-http.version>3.0.0-RC2</zio-http.version>
<fabric8.version>6.8.0</fabric8.version>
<sttp-client.version>3.8.13</sttp-client.version>
<os-lib.version>0.9.1</os-lib.version>
<upickle.version>3.0.0</upickle.version>
<jackson-dataformat-yaml.version>2.14.2</jackson-dataformat-yaml.version>
Expand Down Expand Up @@ -66,26 +65,13 @@
<scope>provided</scope>
</dependency>

<!-- ZIO -->
<!-- ZIO HTTP -->
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-http_${scala.binary.version}</artifactId>
<version>${zio-http.version}</version>
</dependency>

<!-- Http client -->
<dependency>
<groupId>com.softwaremill.sttp.client3</groupId>
<artifactId>zio_${scala.binary.version}</artifactId>
<version>${sttp-client.version}</version>
</dependency>

<dependency>
<groupId>com.softwaremill.sttp.client3</groupId>
<artifactId>zio-json_${scala.binary.version}</artifactId>
<version>${sttp-client.version}</version>
</dependency>

<!-- Tools -->
<dependency>
<groupId>com.lihaoyi</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package org.apache.streampark.flink.kubernetes.v2
import org.apache.streampark.flink.kubernetes.v2.FlinkRestRequest._
import org.apache.streampark.flink.kubernetes.v2.model.{FlinkPipeOprState, JobSavepointDef, JobSavepointStatus}

import sttp.client3._
import sttp.client3.httpclient.zio.HttpClientZioBackend
import sttp.client3.ziojson._
import zio.{IO, Task, ZIO}
import zio.json.{jsonField, DeriveJsonCodec, JsonCodec}
import zio.ZIO.attempt
import zio.http.{Body, Client, Method, Response}
import zio.json._

import java.nio.charset.Charset

import scala.language.implicitConversions
import scala.util.chaining.scalaUtilChainingOps

/** Flink rest-api request. */
Expand All @@ -37,129 +39,116 @@ case class FlinkRestRequest(restUrl: String) {
* Get all job overview info
* see: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-overview
*/
def listJobOverviewInfo: IO[Throwable, Vector[JobOverviewInfo]] = usingSttp { backend =>
request
.get(uri"$restUrl/jobs/overview")
.response(asJson[JobOverviewRsp])
.send(backend)
.flattenBodyT
.map(_.jobs)
}
def listJobOverviewInfo: IO[Throwable, Vector[JobOverviewInfo]] =
for {
res <- get(s"$restUrl/jobs/overview")
rs <- res.body.asJson[JobOverviewRsp]
} yield rs.jobs

/**
* Get cluster overview
* see: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#overview-1
*/
def getClusterOverview: IO[Throwable, ClusterOverviewInfo] = usingSttp { backend =>
request
.get(uri"$restUrl/overview")
.response(asJson[ClusterOverviewInfo])
.send(backend)
.flattenBodyT
}
def getClusterOverview: IO[Throwable, ClusterOverviewInfo] =
for {
res <- get(s"$restUrl/overview")
rs <- res.body.asJson[ClusterOverviewInfo]
} yield rs

/**
* Get job manager configuration.
* see: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobmanager-config
*/
def getJobmanagerConfig: IO[Throwable, Map[String, String]] = usingSttp { backend =>
request
.get(uri"$restUrl/jobmanager/config")
.send(backend)
.flattenBody
.attemptBody(ujson.read(_).arr.map(item => item("key").str -> item("value").str).toMap)
}
def getJobmanagerConfig: IO[Throwable, Map[String, String]] =
for {
res <- get(s"$restUrl/jobmanager/config")
body <- res.body.asString
rs <- attempt {
ujson
.read(body)
.arr
.map(item => item("key").str -> item("value").str)
.toMap
}
} yield rs

/**
* Cancels job.
* see: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-1
*/
def cancelJob(jobId: String): IO[Throwable, Unit] =
usingSttp { backend =>
request
.patch(uri"$restUrl/jobs/$jobId?mode=cancel")
.send(backend)
.unit
}
def cancelJob(jobId: String): IO[Throwable, Unit] = {
patch(s"$restUrl/jobs/$jobId?mode=cancel").unit
}

/**
* Stops job with savepoint.
* see: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-stop
*/
def stopJobWithSavepoint(jobId: String, sptReq: StopJobSptReq): IO[Throwable, TriggerId] =
usingSttp { backend =>
request
.post(uri"$restUrl/jobs/$jobId/stop")
.body(sptReq)
.send(backend)
.flattenBody
.attemptBody(ujson.read(_)("request-id").str)
}
for {
res <- post(s"$restUrl/jobs/$jobId/stop", sptReq.toJson)
body <- res.body.asString
rs <- attempt(ujson.read(body)("request-id").str)
} yield rs

/**
* Triggers a savepoint of job.
* see: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-savepoints
*/
def triggerSavepoint(jobId: String, sptReq: TriggerSptReq): IO[Throwable, TriggerId] =
usingSttp { backend =>
request
.post(uri"$restUrl/jobs/$jobId/savepoints")
.body(sptReq)
.send(backend)
.flattenBody
.attemptBody(ujson.read(_)("request-id").str)
}
for {
res <- post(s"$restUrl/jobs/$jobId/savepoints", sptReq.toJson)
body <- res.body.asString
rs <- attempt(ujson.read(body)("request-id").str)
} yield rs

/**
* Get status of savepoint operation.
* see: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid
*/
def getSavepointOperationStatus(jobId: String, triggerId: String): IO[Throwable, JobSavepointStatus] =
usingSttp { backend =>
request
.get(uri"$restUrl/jobs/$jobId/savepoints/$triggerId")
.send(backend)
.flattenBody
.attemptBody { body =>
val rspJson = ujson.read(body)
val status = rspJson("status")("id").str.pipe(FlinkPipeOprState.ofRaw)
val (location, failureCause) = rspJson("operation").objOpt match {
case None => None -> None
case Some(operation) =>
val loc = operation.get("location").flatMap(_.strOpt)
val failure = operation
.get("failure-cause")
.flatMap(_.objOpt.flatMap(map => map.get("stack-trace")))
.flatMap(_.strOpt)
loc -> failure
}
JobSavepointStatus(status, failureCause, location)
}
}
for {
res <- get(s"$restUrl/jobs/$jobId/savepoints/$triggerId")
body <- res.body.asString
rs <- attempt {
val rspJson = ujson.read(body)
val status = rspJson("status")("id").str.pipe(FlinkPipeOprState.ofRaw)
val (location, failureCause) = rspJson("operation").objOpt match {
case None => None -> None
case Some(operation) =>
val loc = operation.get("location").flatMap(_.strOpt)
val failure = operation
.get("failure-cause")
.flatMap(_.objOpt.flatMap(map => map.get("stack-trace")))
.flatMap(_.strOpt)
loc -> failure
}
JobSavepointStatus(status, failureCause, location)
}
} yield rs
}

object FlinkRestRequest {

val request = basicRequest
private def get(url: String): Task[Response] =
Client.request(url, method = Method.GET).provideLayer(Client.default)

// sttp client wrapper
def usingSttp[A](request: SttpBackend[Task, Any] => IO[Throwable, A]): IO[Throwable, A] =
ZIO.scoped {
HttpClientZioBackend.scoped().flatMap(backend => request(backend))
}
private def patch(url: String): Task[Response] =
Client.request(url, method = Method.PATCH).provideLayer(Client.default)

implicit class RequestIOExceptionExt[A](requestIO: Task[Response[Either[ResponseException[String, String], A]]]) {
@inline def flattenBodyT: IO[Throwable, A] = requestIO.flatMap(rsp => ZIO.fromEither(rsp.body))
}
private def post(url: String, body: String): Task[Response] =
Client
.request(url, method = Method.POST, content = Body.fromString(body, charset = Charset.forName("UTF-8")))
.provideLayer(Client.default)

implicit class RequestIOPlainExt(requestIO: Task[Response[Either[String, String]]]) {
@inline def flattenBody: IO[Throwable, String] =
requestIO.flatMap(rsp => ZIO.fromEither(rsp.body).mapError(new Exception(_)))
implicit class BodyExtension(body: Body) {
def asJson[A](implicit decoder: JsonDecoder[A]): IO[Throwable, A] = for {
data <- body.asString
rsp <- ZIO.fromEither(data.fromJson[A]).mapError(ParseJsonError)
} yield rsp
}

implicit class RequestIOTaskExt[A](requestIO: Task[String]) {
@inline def attemptBody(f: String => A): IO[Throwable, A] = requestIO.flatMap(body => ZIO.attempt(f(body)))
}
case class ParseJsonError(msg: String) extends Exception(msg)

// --- Flink rest api models ---

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ object FlinkK8sObserver extends FlinkK8sObserver {

val trackedKeys = ConcurrentSet.empty[TrackKey].runUIO

// fixme duplicate subscribe
val evaluatedJobSnaps = Ref.make(Map.empty[AppId, JobSnapshot]).runUIO
val restSvcEndpointSnaps = ConcurrentMap.empty[(Namespace, Name), RestSvcEndpoint].runUIO
// fixme duplicate subscribe
val clusterMetricsSnaps = ConcurrentMap.empty[(Namespace, Name), ClusterMetrics].runUIO

val deployCRSnaps = ConcurrentMap.empty[(Namespace, Name), (DeployCRStatus, Option[JobStatus])].runUIO
Expand All @@ -116,9 +118,11 @@ object FlinkK8sObserver extends FlinkK8sObserver {
override def track(key: TrackKey): UIO[Unit] = {

def trackCluster(ns: String, name: String): UIO[Unit] = {
deployCrObserver.watch(ns, name) <*>
restSvcEndpointObserver.watch(ns, name) *>
clusterObserver.watch(ns, name)
for {
_ <- deployCrObserver.watch(ns, name)
_ <- restSvcEndpointObserver.watch(ns, name)
_ <- clusterObserver.watch(ns, name)
} yield ()
}

def trackSessionJob(ns: String, name: String, refDeployName: String): UIO[Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ class UsingObserver extends AnyWordSpecLike with BeforeAndAfterAll {
for {
// track resource
_ <- FlinkK8sObserver.track(TrackKey.sessionJob(233, "fdev", "simple-sessionjob", "simple-session"))
_ <- FlinkK8sObserver.track(TrackKey.appJob(233, "fdev", "simple-appjob"))
_ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev", "simple-appjob"))
// subscribe job status changes
watchStream = FlinkK8sObserver.evaluatedJobSnaps.flatSubscribeValues()
watchStream = FlinkK8sObserver.clusterMetricsSnaps.subscribe()
_ <- watchStream.debugPretty.runDrain
} yield ()
}
Expand All @@ -70,7 +70,7 @@ class UsingObserver extends AnyWordSpecLike with BeforeAndAfterAll {
for {
// track resource
_ <- FlinkK8sObserver.track(TrackKey.sessionJob(233, "fdev", "simple-sessionjob", "simple-session"))
_ <- FlinkK8sObserver.track(TrackKey.appJob(233, "fdev", "simple-appjob"))
_ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev", "simple-appjob"))
// subscribe job status changes
watchStream = FlinkK8sObserver.clusterMetricsSnaps.flatSubscribeValues()
_ <- watchStream.debugPretty.runDrain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class UsingOperator extends AnyWordSpecLike with BeforeAndAfterAll {
name = "appjob-with-ingress",
namespace = "fdev",
image = "flink:1.16",
flinkVersion = FlinkVersion.v1_16,
flinkVersion = FlinkVersion.V1_16,
jobManager = JobManagerDef(cpu = 1, memory = "1024m"),
taskManager = TaskManagerDef(cpu = 1, memory = "1024m"),
job = JobDef(
Expand Down

0 comments on commit 9edac37

Please sign in to comment.