Skip to content

Commit

Permalink
Merge pull request #906 from poohsen/fix-backoff-logic
Browse files Browse the repository at this point in the history
Fix backoff logic in ResilientStream
  • Loading branch information
matejcerny authored Jan 18, 2024
2 parents 3683e83 + d2be00a commit b0091c4
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.util.control.NonFatal
*
* In case of failure, the entire stream will be restarted after the specified retry time with an exponential backoff.
*
* By default the program will be restarted in 5 seconds, then 10, then 15, etc.
* By default the program will be restarted in 5 * 1^2 seconds, then 5 * 2^2 seconds, then 5 * 3^2 seconds, etc.
*
* @see
* ResilientStreamSpec for more.
Expand All @@ -47,14 +47,18 @@ object ResilientStream {

private def loop[F[_]: Log: Temporal](
program: Stream[F, Unit],
retry: FiniteDuration,
retryDelay: FiniteDuration,
count: Int
): Stream[F, Unit] =
program.handleErrorWith {
case NonFatal(err) =>
Stream.eval(Log[F].error(err.getMessage) *> Log[F].info(s"Restarting in ${retry.toSeconds * count}...")) >>
loop[F](Stream.sleep(retry * count.toLong) >> program, retry, count + 1)
case _ => ???
val delay = retryDelay * Math.pow(count, 2).toInt
Stream.eval(
Log[F].error(err.getMessage) *> Log[F].info(s"Restarting in $delay...")
) >>
Stream.sleep(delay) >> loop[F](program, retryDelay, count + 1)
case fatal =>
Stream.eval(Log[F].error(s"Fatal error: ${fatal.getMessage}")) *> Stream.raiseError(fatal)
}

}

0 comments on commit b0091c4

Please sign in to comment.