Skip to content

Commit

Permalink
Feature: refactor scala rpc runtime (#441)
Browse files Browse the repository at this point in the history
* wip

* logging update

* cleanup
  • Loading branch information
Caparow authored Aug 7, 2023
1 parent 54902a1 commit 36383a4
Show file tree
Hide file tree
Showing 25 changed files with 818 additions and 915 deletions.
71 changes: 71 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
version = "3.7.11"
runner.dialect = scala3Future
project.git = true
project.excludePaths = ["glob:**.sbt", "glob:**sbtgen.sc"]

maxColumn = 170

importSelectors = singleLine
spaces.inImportCurlyBraces = false

literals.hexDigits = "Upper"

align.tokens."+" = ["<-", "->", {code: "=", owner: ".*"}, {code: "=>", owner: "Case"}]
align.tokenCategory {
Equals = Assign
LeftArrow = Assign
RightArrow = Assign
}

align.arrowEnumeratorGenerator = false
align.ifWhileOpenParen = false
align.openParenCallSite = false
align.openParenDefnSite = false
align.stripMargin = true
align.inInterpolation = false

newlines.beforeCurlyLambdaParams = multiline
newlines.afterCurlyLambdaParams = never
newlines.implicitParamListModifierPrefer = after
newlines.avoidAfterYield = true
newlines.alwaysBeforeMultilineDef = false
newlines.inInterpolation = allow

indent.defnSite = 2
indent.callSite = 2
indent.extendSite = 2

assumeStandardLibraryStripMargin = true
docstrings = ScalaDoc
docstrings.wrap = false
docstrings.blankFirstLine = keep
docstrings.forceBlankLineBefore = false
lineEndings = unix

danglingParentheses.callSite = true
danglingParentheses.defnSite = true
danglingParentheses.ctrlSite = false
danglingParentheses.exclude = []
verticalAlignMultilineOperators = true

includeCurlyBraceInSelectChains = false
includeNoParensInSelectChains = false

verticalMultiline.atDefnSite = true
verticalMultiline.arityThreshold = 100
verticalMultiline.newlineAfterOpenParen = false

optIn.configStyleArguments = true
optIn.breaksInsideChains = true
optIn.breakChainOnFirstMethodDot = true
optIn.selfAnnotationNewline = true
optIn.annotationNewlines = true

rewrite.rules = [AsciiSortImports, RedundantBraces, RedundantParens]
rewrite.redundantBraces.methodBodies = false // remove braces only in interpolations
rewrite.redundantBraces.maxLines = -1 // remove braces only in interpolations
rewrite.redundantBraces.generalExpressions = false // remove braces only in interpolations
rewrite.redundantBraces.includeUnitMethods = false
rewrite.redundantBraces.stringInterpolation = true
rewrite.redundantBraces.parensForOneLineApply = true
rewrite.trailingCommas.style = multiple

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,96 +1,94 @@
package izumi.idealingua.runtime.rpc.http4s

import izumi.functional.bio.IO2
import izumi.functional.bio.Exit
import izumi.idealingua.runtime.rpc._
import izumi.logstage.api.IzLogger
import cats.effect.Async
import fs2.Stream
import io.circe
import io.circe.parser.parse
import org.http4s._
import org.http4s.blaze.client._

class ClientDispatcher[C <: Http4sContext]
(
val c: Http4sContextImpl[C]
, logger: IzLogger
, printer: circe.Printer
, baseUri: Uri
, codec: IRTClientMultiplexor[GetBiIO[C]#l]
) extends IRTDispatcher[GetBiIO[C]#l] {
import c._

def dispatch(request: IRTMuxRequest): BiIO[Throwable, IRTMuxResponse] = {
val handler = handleResponse(request, _: Response[MonoIO])
import izumi.functional.bio.{Exit, F, IO2}
import izumi.idealingua.runtime.rpc.*
import izumi.logstage.api.IzLogger
import org.http4s.*
import org.http4s.blaze.client.*

class ClientDispatcher[F[+_, +_]: IO2](
logger: IzLogger,
printer: circe.Printer,
baseUri: Uri,
codec: IRTClientMultiplexor[F],
executionContext: HttpExecutionContext,
)(implicit AT: Async[F[Throwable, _]]
) extends IRTDispatcher[F] {

def dispatch(request: IRTMuxRequest): F[Throwable, IRTMuxResponse] = {
val handler = handleResponse(request, _: Response[F[Throwable, _]])

logger.trace(s"${request.method -> "method"}: Goint to perform $request")

codec.encode(request)
.flatMap {
encoded =>
val outBytes: Array[Byte] = printer.print(encoded).getBytes
val req = buildRequest(baseUri, request, outBytes)
codec.encode(request).flatMap {
encoded =>
val outBytes: Array[Byte] = printer.print(encoded).getBytes
val req = buildRequest(baseUri, request, outBytes)

logger.debug(s"${request.method -> "method"}: Prepared request $encoded")
runRequest[IRTMuxResponse](handler, req)
}
logger.debug(s"${request.method -> "method"}: Prepared request $encoded")
runRequest[IRTMuxResponse](handler, req)
}
}

protected def runRequest[T](handler: Response[MonoIO] => MonoIO[T], req: Request[MonoIO]): BiIO[Throwable, T] = {
val clientBuilder = blazeClientBuilder(BlazeClientBuilder[MonoIO].withExecutionContext(c.clientExecutionContext))
protected def runRequest[T](handler: Response[F[Throwable, _]] => F[Throwable, T], req: Request[F[Throwable, _]]): F[Throwable, T] = {
val clientBuilder = blazeClientBuilder(BlazeClientBuilder[F[Throwable, _]].withExecutionContext(executionContext.clientExecutionContext))
clientBuilder.resource.use {
_.run(req).use(handler)
}
}

protected def blazeClientBuilder(defaultBuilder: BlazeClientBuilder[MonoIO]): BlazeClientBuilder[MonoIO] = defaultBuilder
protected def blazeClientBuilder(defaultBuilder: BlazeClientBuilder[F[Throwable, _]]): BlazeClientBuilder[F[Throwable, _]] = defaultBuilder

protected def handleResponse(input: IRTMuxRequest, resp: Response[MonoIO]): MonoIO[IRTMuxResponse] = {
protected def handleResponse(input: IRTMuxRequest, resp: Response[F[Throwable, _]]): F[Throwable, IRTMuxResponse] = {
logger.trace(s"${input.method -> "method"}: Received response, going to materialize, ${resp.status.code -> "code"} ${resp.status.reason -> "reason"}")

if (resp.status != Status.Ok) {
logger.info(s"${input.method -> "method"}: unexpected HTTP response, ${resp.status.code -> "code"} ${resp.status.reason -> "reason"}")
F.fail(IRTUnexpectedHttpStatus(resp.status))
} else {
resp
.as[MaterializedStream]
.flatMap {
body =>
logger.trace(s"${input.method -> "method"}: Received response: $body")
val decoded = for {
parsed <- F.fromEither(parse(body))
product <- codec.decode(parsed, input.method)
} yield {
logger.trace(s"${input.method -> "method"}: decoded response: $product")
product
}

decoded.sandbox.catchAll {
case Exit.Error(error, trace) =>
logger.info(s"${input.method -> "method"}: decoder returned failure on $body: $error $trace")
F.fail(new IRTUnparseableDataException(s"${input.method}: decoder returned failure on body=$body: error=$error trace=$trace", Option(error)))

case Exit.Termination(f, _, trace) =>
logger.info(s"${input.method -> "method"}: decoder failed on $body: $f $trace")
F.fail(new IRTUnparseableDataException(s"${input.method}: decoder failed on body=$body: f=$f trace=$trace", Option(f)))

case Exit.Interruption(error, _, trace) =>
logger.info(s"${input.method -> "method"}: decoder interrupted on $body: $error $trace")
F.fail(new IRTUnparseableDataException(s"${input.method}: decoder interrupted on body=$body: error=$error trace=$trace", Option(error)))
}
}
.as[String]
.flatMap {
body =>
logger.trace(s"${input.method -> "method"}: Received response: $body")
val decoded = for {
parsed <- F.fromEither(parse(body))
product <- codec.decode(parsed, input.method)
} yield {
logger.trace(s"${input.method -> "method"}: decoded response: $product")
product
}

decoded.sandbox.catchAll {
case Exit.Error(error, trace) =>
logger.info(s"${input.method -> "method"}: decoder returned failure on $body: $error $trace")
F.fail(new IRTUnparseableDataException(s"${input.method}: decoder returned failure on body=$body: error=$error trace=$trace", Option(error)))

case Exit.Termination(f, _, trace) =>
logger.info(s"${input.method -> "method"}: decoder failed on $body: $f $trace")
F.fail(new IRTUnparseableDataException(s"${input.method}: decoder failed on body=$body: f=$f trace=$trace", Option(f)))

case Exit.Interruption(error, _, trace) =>
logger.info(s"${input.method -> "method"}: decoder interrupted on $body: $error $trace")
F.fail(new IRTUnparseableDataException(s"${input.method}: decoder interrupted on body=$body: error=$error trace=$trace", Option(error)))
}
}
}
}

protected final def buildRequest(baseUri: Uri, input: IRTMuxRequest, body: Array[Byte]): Request[MonoIO] = {
val entityBody: EntityBody[MonoIO] = Stream.emits(body).covary[MonoIO]
protected final def buildRequest(baseUri: Uri, input: IRTMuxRequest, body: Array[Byte]): Request[F[Throwable, _]] = {
val entityBody: EntityBody[F[Throwable, _]] = Stream.emits(body).covary[F[Throwable, _]]
buildRequest(baseUri, input, entityBody)
}

protected final def buildRequest(baseUri: Uri, input: IRTMuxRequest, body: EntityBody[MonoIO]): Request[MonoIO] = {
protected final def buildRequest(baseUri: Uri, input: IRTMuxRequest, body: EntityBody[F[Throwable, _]]): Request[F[Throwable, _]] = {
val uri = baseUri / input.method.service.value / input.method.methodId.value

val base: Request[MonoIO] = if (input.body.value.productArity > 0) {
val base: Request[F[Throwable, _]] = if (input.body.value.productArity > 0) {
Request(org.http4s.Method.POST, uri, body = body)
} else {
Request(org.http4s.Method.GET, uri)
Expand All @@ -99,5 +97,5 @@ class ClientDispatcher[C <: Http4sContext]
transformRequest(base)
}

protected def transformRequest(request: Request[MonoIO]): Request[MonoIO] = request
protected def transformRequest(request: Request[F[Throwable, _]]): Request[F[Throwable, _]] = request
}

This file was deleted.

Loading

0 comments on commit 36383a4

Please sign in to comment.