diff --git a/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala b/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala index 13fe96fab..cabb40673 100644 --- a/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala +++ b/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala @@ -204,7 +204,7 @@ object SchemaRegistry { schemaRegistryClientRetries: Int, schemaRegistryClientRetriesDelay: FiniteDuration): SchemaRegistry[F] = new SchemaRegistry[F] { - val retryPolicy = limitRetries(schemaRegistryClientRetries) |+| constantDelay[F](schemaRegistryClientRetriesDelay) + val retryPolicy = limitRetries(schemaRegistryClientRetries) |+| exponentialBackoff[F](schemaRegistryClientRetriesDelay) private implicit class SchemaOps(sch: Schema) { def fields: List[Schema.Field] = fieldsEval("topLevel", box = false).value @@ -296,17 +296,24 @@ object SchemaRegistry { ): F[SchemaVersion] = Sync[F].delay { schemaRegistryClient.getVersion(subject, schema) - }.retryingOnAllErrors(retryPolicy, onFailure("getVersion")) + }.retryingOnSomeErrors( + isWorthRetrying = { + case r: RestClientException if r.getErrorCode == 40401 => false // Do not retry for RestClientException with error code 40401 + case _ => true // Retry for all other exceptions + }, + policy = retryPolicy, + onError = onFailure("getVersion", subject) + ) override def getAllVersions(subject: String): F[List[SchemaId]] = Sync[F].fromTry(Try(schemaRegistryClient.getAllVersions(subject))) .map(_.asScala.toList.map(_.toInt)).recover { case r: RestClientException if r.getErrorCode == 40401 => List.empty - }.retryingOnAllErrors(retryPolicy, onFailure("getAllVersions")) + }.retryingOnAllErrors(retryPolicy, onFailure("getAllVersions", subject)) - private def onFailure(resourceTried: String): (Throwable, RetryDetails) => F[Unit] = + private def onFailure(resourceTried: String, subject: String): (Throwable, RetryDetails) => F[Unit] = (error, retryDetails) => - Logger[F].info(s"Retrying due to failure in SchemaRegistry.$resourceTried: $error. RetryDetails: $retryDetails") + Logger[F].info(s"Retrying due to failure in SchemaRegistry.$resourceTried: $error. Subject: $subject. RetryDetails: $retryDetails") override def getAllSubjects: F[List[String]] = Sync[F].delay {