Skip to content

Commit

Permalink
Update RequestState to have configurable TTLs. Add request methods to…
Browse files Browse the repository at this point in the history
… control request execution. (#470)
  • Loading branch information
Caparow authored Nov 8, 2023
1 parent 2dc3684 commit 7fb5812
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,18 +198,9 @@ object WsRpcDispatcherFactory {
method: Option[IRTMethodId],
timeout: FiniteDuration,
): F[Throwable, Option[RawResponse]] = {
F.bracket {
method match {
case Some(irtMethod) => requestState.request(id, irtMethod)
case _ => requestState.requestEmpty(id)
}
}(_ => requestState.forget(id))(
_ =>
for {
_ <- fromNettyFuture(nettyWebSocket.sendTextFrame(printer.print(packet.asJson)))
res <- requestState.awaitResponse(id, timeout)
} yield res
)
requestState.requestAndAwait(id, method, timeout) {
fromNettyFuture(nettyWebSocket.sendTextFrame(printer.print(packet.asJson)))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ trait WsClientSession[F[+_, +_], RequestCtx, ClientId] extends WsClientResponder
def updateId(maybeNewId: Option[ClientId]): F[Throwable, Unit]
def outQueue: Queue[F[Throwable, _], WebSocketFrame]

def request(method: IRTMethodId, data: Json): F[Throwable, RpcPacketId]
def requestAndAwaitResponse(method: IRTMethodId, data: Json, timeout: FiniteDuration): F[Throwable, Option[RawResponse]]

def finish(): F[Throwable, Unit]
Expand All @@ -46,23 +45,15 @@ object WsClientSession {

def id: WsClientId[ClientId] = WsClientId(sessionId, clientId.get())

override def request(method: IRTMethodId, data: Json): F[Throwable, RpcPacketId] = {
def requestAndAwaitResponse(method: IRTMethodId, data: Json, timeout: FiniteDuration): F[Throwable, Option[RawResponse]] = {
val id = RpcPacketId.random()
val request = RpcPacket.buzzerRequest(id, method, data)
for {
_ <- logger.debug(s"WS Session: enqueue $request with $id to request state & send queue.")
_ <- outQueue.offer(Text(printer.print(request.asJson)))
_ <- requestState.request(id, method)
} yield id
}

def requestAndAwaitResponse(method: IRTMethodId, data: Json, timeout: FiniteDuration): F[Throwable, Option[RawResponse]] = {
for {
id <- request(method, data)
response <- requestState.awaitResponse(id, timeout).guarantee {
logger.debug(s"WS Session: $method, ${id -> "id"}: cleaning request state.") *>
requestState.forget(id)
response <- requestState.requestAndAwait(id, Some(method), timeout) {
outQueue.offer(Text(printer.print(request.asJson)))
}
_ <- logger.debug(s"WS Session: $method, ${id -> "id"}: cleaning request state.")
} yield response
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.time.OffsetDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters.*

class WsRequestState[F[+_, +_]: IO2: Temporal2: Primitives2] extends WsClientResponder[F] {
Expand All @@ -21,21 +21,31 @@ class WsRequestState[F[+_, +_]: IO2: Temporal2: Primitives2] extends WsClientRes
private[this] val requests: ConcurrentHashMap[RpcPacketId, IRTMethodId] = new ConcurrentHashMap[RpcPacketId, IRTMethodId]()
private[this] val responses: ConcurrentHashMap[RpcPacketId, RequestHandler[F]] = new ConcurrentHashMap[RpcPacketId, RequestHandler[F]]()

def requestEmpty(id: RpcPacketId, ttl: FiniteDuration = 3.minute): F[Throwable, Promise2[F, Nothing, RawResponse]] = {
def requestAndAwait[A](id: RpcPacketId, methodId: Option[IRTMethodId], timeout: FiniteDuration)(request: => F[Throwable, A]): F[Throwable, Option[RawResponse]] = {
(for {
handler <- registerRequest(id, methodId, timeout)
// request should be performed after handler created
_ <- request
res <- handler.promise.await.timeout(timeout)
} yield res).guarantee(forget(id))
}

def registerRequest(id: RpcPacketId, methodId: Option[IRTMethodId], timeout: FiniteDuration): F[Nothing, RequestHandler[F]] = {
for {
now <- clock.nowOffset()
_ <- forgetExpired(now)
promise <- F.mkPromise[Nothing, RawResponse]
ttl = timeout * 3
handler = RequestHandler(id, promise, ttl, now)
_ <- F.sync(responses.put(id, handler))
} yield promise
_ <- F.traverse(methodId)(m => F.sync(requests.put(id, m)))
} yield handler
}

def request(id: RpcPacketId, methodId: IRTMethodId, ttl: FiniteDuration = 3.minute): F[Throwable, Promise2[F, Nothing, RawResponse]] = {
for {
_ <- F.sync(requests.put(id, methodId))
promise <- requestEmpty(id, ttl)
} yield promise
def awaitResponse(id: RpcPacketId, timeout: FiniteDuration): F[Throwable, Option[RawResponse]] = {
F.fromOption(new IRTMissingHandlerException(s"Can not await for async response: $id. Missing handler.", null)) {
Option(responses.get(id))
}.flatMap(_.promise.await.timeout(timeout))
}

def forget(id: RpcPacketId): F[Nothing, Unit] = F.sync {
Expand Down Expand Up @@ -67,12 +77,6 @@ class WsRequestState[F[+_, +_]: IO2: Temporal2: Primitives2] extends WsClientRes
} yield ()
}

def awaitResponse(id: RpcPacketId, timeout: FiniteDuration): F[Throwable, Option[RawResponse]] = {
F.fromOption(new IRTMissingHandlerException(s"Can not await for async response: $id. Missing handler.", null)) {
Option(responses.get(id))
}.flatMap(_.promise.await.timeout(timeout))
}

private[this] def forgetExpired(now: OffsetDateTime): F[Nothing, Unit] = {
for {
removed <- F.sync {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ class Http4sTransportTest extends AnyWordSpec {
for {
id1 <- ZIO.succeed(RpcPacketId.random())
id2 <- ZIO.succeed(RpcPacketId.random())
_ <- rs.requestEmpty(id1, 0.minutes)
_ <- rs.requestEmpty(id2)
_ <- rs.registerRequest(id1, None, 0.minutes)
_ <- rs.registerRequest(id2, None, 5.minutes)
_ <- F.attempt(rs.awaitResponse(id1, 5.seconds)).map {
case Left(_: IRTMissingHandlerException) => ()
case other => fail(s"Expected IRTMissingHandlerException, but got $other.")
Expand Down

0 comments on commit 7fb5812

Please sign in to comment.