Skip to content

Commit

Permalink
Log PeerClosedStreamExceptions as warnings instead of errors
Browse files Browse the repository at this point in the history
  • Loading branch information
ladinu committed Sep 13, 2024
1 parent 3b93790 commit 3f261d9
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import pekko.http.javadsl.model.HttpResponse
import pekko.japi.{ Function => jFunction }
import io.grpc.{ Status, StatusRuntimeException }

import org.apache.pekko.http.scaladsl.model.http2.PeerClosedStreamException

import scala.concurrent.ExecutionException
import pekko.event.Logging

Expand All @@ -40,6 +42,8 @@ object GrpcExceptionHandler {
default(system)
}

private def log(system: ActorSystem) = Logging(system, "org.apache.pekko.grpc.javadsl.GrpcExceptionHandler")

/** INTERNAL API */
@InternalApi
private def default(system: ActorSystem): jFunction[Throwable, Trailers] =
Expand All @@ -57,9 +61,11 @@ object GrpcExceptionHandler {
case e: NotImplementedError => Trailers(Status.UNIMPLEMENTED.withDescription(e.getMessage))
case e: UnsupportedOperationException => Trailers(Status.UNIMPLEMENTED.withDescription(e.getMessage))
case e: StatusRuntimeException => Trailers(e.getStatus, new GrpcMetadataImpl(e.getTrailers))
case e: PeerClosedStreamException =>
log(system).warning(e, "Peer closed the stream: [{}]", e.getMessage)
INTERNAL
case other =>
val log = Logging(system, "org.apache.pekko.grpc.javadsl.GrpcExceptionHandler")
log.error(other, "Unhandled error: [{}]", other.getMessage)
log(system).error(other, "Unhandled error: [{}]", other.getMessage)
INTERNAL
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import pekko.grpc.GrpcProtocol.GrpcProtocolWriter
import pekko.grpc.internal.{ GrpcMetadataImpl, GrpcResponseHelpers, MissingParameterException }
import pekko.http.scaladsl.model.HttpResponse
import io.grpc.{ Status, StatusRuntimeException }
import org.apache.pekko.http.scaladsl.model.http2.PeerClosedStreamException

import scala.concurrent.{ ExecutionException, Future }
import pekko.event.Logging
Expand All @@ -31,6 +32,8 @@ object GrpcExceptionHandler {
private val INTERNAL = Trailers(Status.INTERNAL)
private val INVALID_ARGUMENT = Trailers(Status.INVALID_ARGUMENT)

private def log(system: ActorSystem) = Logging(system, "org.apache.pekko.grpc.scaladsl.GrpcExceptionHandler")

def defaultMapper(system: ActorSystem): PartialFunction[Throwable, Trailers] = {
case e: ExecutionException =>
if (e.getCause == null) INTERNAL
Expand All @@ -42,9 +45,11 @@ object GrpcExceptionHandler {
case e: StatusRuntimeException =>
val meta = Option(e.getTrailers).getOrElse(new io.grpc.Metadata())
Trailers(e.getStatus, new GrpcMetadataImpl(meta))
case e: PeerClosedStreamException =>
log(system).warning(e, "Peer closed the stream: [{}]", e.getMessage)
INTERNAL
case other =>
val log = Logging(system, "org.apache.pekko.grpc.scaladsl.GrpcExceptionHandler")
log.error(other, "Unhandled error: [{}]", other.getMessage)
log(system).error(other, "Unhandled error: [{}]", other.getMessage)
INTERNAL
}

Expand Down

0 comments on commit 3f261d9

Please sign in to comment.