Skip to content

Commit

Permalink
Merge branch 'ADAPT1-1599-schema-registry-exponential-backoff' into '…
Browse files Browse the repository at this point in the history
…master'

ADAPT1-1599 "Schema registry exponential backoff"

See merge request pluralsight/Technology/adapt/data-platform/hydra!12
  • Loading branch information
aadit-chugh committed Jul 5, 2024
2 parents 1a2c134 + fb43b2f commit 890522f
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ object SchemaRegistry {
schemaRegistryClientRetries: Int,
schemaRegistryClientRetriesDelay: FiniteDuration): SchemaRegistry[F] =
new SchemaRegistry[F] {
val retryPolicy = limitRetries(schemaRegistryClientRetries) |+| constantDelay[F](schemaRegistryClientRetriesDelay)
val retryPolicy = limitRetries(schemaRegistryClientRetries) |+| exponentialBackoff[F](schemaRegistryClientRetriesDelay)

private implicit class SchemaOps(sch: Schema) {
def fields: List[Schema.Field] = fieldsEval("topLevel", box = false).value
Expand Down Expand Up @@ -296,17 +296,24 @@ object SchemaRegistry {
): F[SchemaVersion] =
Sync[F].delay {
schemaRegistryClient.getVersion(subject, schema)
}.retryingOnAllErrors(retryPolicy, onFailure("getVersion"))
}.retryingOnSomeErrors(
isWorthRetrying = {
case r: RestClientException if r.getErrorCode == 40401 => false // Do not retry for RestClientException with error code 40401
case _ => true // Retry for all other exceptions
},
policy = retryPolicy,
onError = onFailure("getVersion", subject)
)

override def getAllVersions(subject: String): F[List[SchemaId]] =
Sync[F].fromTry(Try(schemaRegistryClient.getAllVersions(subject)))
.map(_.asScala.toList.map(_.toInt)).recover {
case r: RestClientException if r.getErrorCode == 40401 => List.empty
}.retryingOnAllErrors(retryPolicy, onFailure("getAllVersions"))
}.retryingOnAllErrors(retryPolicy, onFailure("getAllVersions", subject))

private def onFailure(resourceTried: String): (Throwable, RetryDetails) => F[Unit] =
private def onFailure(resourceTried: String, subject: String): (Throwable, RetryDetails) => F[Unit] =
(error, retryDetails) =>
Logger[F].info(s"Retrying due to failure in SchemaRegistry.$resourceTried: $error. RetryDetails: $retryDetails")
Logger[F].info(s"Retrying due to failure in SchemaRegistry.$resourceTried: $error. Subject: $subject. RetryDetails: $retryDetails")

override def getAllSubjects: F[List[String]] =
Sync[F].delay {
Expand Down

0 comments on commit 890522f

Please sign in to comment.