diff --git a/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala b/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala index 7666c6b86..b68a3ac11 100644 --- a/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala +++ b/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala @@ -66,7 +66,7 @@ trait SchemaRegistry[F[_]] { * @param schema - avro Schema which is expected to be in Schema Registry * @return SchemaVersion */ - def getVersion(subject: String, schema: Schema): F[SchemaVersion] + def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean = false): F[SchemaVersion] /** * Retrieves all SchemaVersion(s) for a given subject. @@ -74,7 +74,7 @@ trait SchemaRegistry[F[_]] { * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) * @return List[SchemaVersion] or List.empty if Subject Not Found */ - def getAllVersions(subject: String): F[List[SchemaVersion]] + def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): F[List[SchemaVersion]] /** * Retrieves all subjects found in the SchemaRegistry @@ -157,11 +157,10 @@ object SchemaRegistry { maxCacheSize: Int, securityConfig: SchemaRegistrySecurityConfig, schemaRegistryClientRetries: Int, - schemaRegistryClientRetriesDelay: FiniteDuration, - useExponentialBackoff: Boolean + schemaRegistryClientRetriesDelay: FiniteDuration ): F[SchemaRegistry[F]] = Sync[F].delay { getFromSchemaRegistryClient(new CachedSchemaRegistryClient(schemaRegistryBaseUrl, maxCacheSize, - securityConfig.toConfigMap.asJava), schemaRegistryClientRetries, schemaRegistryClientRetriesDelay, useExponentialBackoff) + securityConfig.toConfigMap.asJava), schemaRegistryClientRetries, schemaRegistryClientRetriesDelay) } // scalastyle:off parameter.number @@ -175,8 +174,7 @@ object SchemaRegistry { schemaCacheTtl: Int, versionCacheTtl: Int, schemaRegistryClientRetries: Int, - schemaRegistryClientRetriesDelay: FiniteDuration, - useExponentialBackoff: Boolean + schemaRegistryClientRetriesDelay: FiniteDuration ): F[SchemaRegistry[F]] = Sync[F].delay { getFromSchemaRegistryClient( new RedisSchemaRegistryClient( @@ -188,8 +186,7 @@ object SchemaRegistry { ssl ), schemaRegistryClientRetries, - schemaRegistryClientRetriesDelay, - useExponentialBackoff + schemaRegistryClientRetriesDelay ) } @@ -205,14 +202,12 @@ object SchemaRegistry { // scalastyle:off method.length private def getFromSchemaRegistryClient[F[_] : Sync : Logger : Sleep](schemaRegistryClient: SchemaRegistryClient, schemaRegistryClientRetries: Int, - schemaRegistryClientRetriesDelay: FiniteDuration, - useExponentialBackoff: Boolean = false): SchemaRegistry[F] = + schemaRegistryClientRetriesDelay: FiniteDuration): SchemaRegistry[F] = new SchemaRegistry[F] { - val retryPolicy = if (useExponentialBackoff) { - limitRetries(schemaRegistryClientRetries) |+| exponentialBackoff[F](schemaRegistryClientRetriesDelay) - } else { - limitRetries(schemaRegistryClientRetries) |+| constantDelay[F](schemaRegistryClientRetriesDelay) - } + lazy val constantDelayRetryPolicy = limitRetries(schemaRegistryClientRetries) |+| constantDelay[F](schemaRegistryClientRetriesDelay) + + lazy val exponentialBackOffRetryPolicy = limitRetries(schemaRegistryClientRetries) |+| exponentialBackoff[F](schemaRegistryClientRetriesDelay) + private implicit class SchemaOps(sch: Schema) { def fields: List[Schema.Field] = fieldsEval("topLevel", box = false).value @@ -300,8 +295,10 @@ object SchemaRegistry { override def getVersion( subject: String, - schema: Schema - ): F[SchemaVersion] = + schema: Schema, + useExponentialBackoffRetryPolicy: Boolean = false + ): F[SchemaVersion] = { + val retryPolicy = if(useExponentialBackoffRetryPolicy) exponentialBackOffRetryPolicy else constantDelayRetryPolicy Sync[F].delay { schemaRegistryClient.getVersion(subject, schema) }.retryingOnSomeErrors( @@ -312,12 +309,15 @@ object SchemaRegistry { policy = retryPolicy, onError = onFailure("getVersion", subject) ) + } - override def getAllVersions(subject: String): F[List[SchemaId]] = + override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): F[List[SchemaId]] = { + val retryPolicy = if(useExponentialBackoffRetryPolicy) exponentialBackOffRetryPolicy else constantDelayRetryPolicy Sync[F].fromTry(Try(schemaRegistryClient.getAllVersions(subject))) .map(_.asScala.toList.map(_.toInt)).recover { case r: RestClientException if r.getErrorCode == 40401 => List.empty }.retryingOnAllErrors(retryPolicy, onFailure("getAllVersions", subject)) + } private def onFailure(resourceTried: String, subject: String): (Throwable, RetryDetails) => F[Unit] = (error, retryDetails) => diff --git a/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala b/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala index 62e8ac018..79ef5c5a3 100644 --- a/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala +++ b/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala @@ -19,8 +19,7 @@ object AppConfig { fullUrl: String, maxCacheSize: Int, schemaRegistryClientRetriesConfig: Int, - schemaRegistryClientRetrieDelaysConfig: FiniteDuration, - useExponentialBackoff: Boolean + schemaRegistryClientRetrieDelaysConfig: FiniteDuration ) final case class SchemaRegistryRedisConfig( @@ -40,8 +39,7 @@ object AppConfig { .default("http://localhost:8081"), env("HYDRA_MAX_SCHEMAS_PER_SUBJECT").as[Int].default(1000), env("HYDRA_SCHEMA_REGISTRY_RETRIES").as[Int].default(3), - env("HYDRA_SCHEMA_REGISTRY_RETRIES_DELAY").as[FiniteDuration].default(500.milliseconds), - env("HYDRA_SCHEMA_REGISTRY_USE_EXPONENTIAL_BACKOFF").as[Boolean].default(false) + env("HYDRA_SCHEMA_REGISTRY_RETRIES_DELAY").as[FiniteDuration].default(500.milliseconds) ).parMapN(SchemaRegistryConfig) private val schemaRegistryRedisConfig: ConfigValue[SchemaRegistryRedisConfig] = ( diff --git a/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala b/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala index 91f6c9dbd..c5e81b4af 100644 --- a/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala +++ b/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala @@ -38,8 +38,7 @@ object Algebras { config.schemaRegistryRedisConfig.schemaCacheTtl, config.schemaRegistryRedisConfig.versionCacheTtl, config.createTopicConfig.schemaRegistryConfig.schemaRegistryClientRetriesConfig, - config.createTopicConfig.schemaRegistryConfig.schemaRegistryClientRetrieDelaysConfig, - config.createTopicConfig.schemaRegistryConfig.useExponentialBackoff + config.createTopicConfig.schemaRegistryConfig.schemaRegistryClientRetrieDelaysConfig ) } else { SchemaRegistry.live[F]( @@ -47,8 +46,7 @@ object Algebras { config.createTopicConfig.schemaRegistryConfig.maxCacheSize, config.schemaRegistrySecurityConfig, config.createTopicConfig.schemaRegistryConfig.schemaRegistryClientRetriesConfig, - config.createTopicConfig.schemaRegistryConfig.schemaRegistryClientRetrieDelaysConfig, - config.createTopicConfig.schemaRegistryConfig.useExponentialBackoff + config.createTopicConfig.schemaRegistryConfig.schemaRegistryClientRetrieDelaysConfig ) } kafkaAdmin <- KafkaAdminAlgebra.live[F](config.createTopicConfig.bootstrapServers, kafkaClientSecurityConfig = config.kafkaClientSecurityConfig) diff --git a/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala b/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala index b6d29137a..7349bf738 100644 --- a/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala +++ b/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala @@ -64,10 +64,10 @@ class TopicDeletionEndpointSpec extends Matchers with AnyWordSpecLike with Scala override def deleteSchemaOfVersion(subject: String,version: SchemaVersion): F[Unit] = underlying.deleteSchemaOfVersion(subject,version) - override def getVersion(subject: String,schema: Schema): F[SchemaVersion] = + override def getVersion(subject: String,schema: Schema, useExponentialBackoffRetryPolicy: Boolean = false): F[SchemaVersion] = underlying.getVersion(subject,schema) - override def getAllVersions(subject: String): F[List[SchemaId]] = + override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): F[List[SchemaId]] = underlying.getAllVersions(subject) override def getAllSubjects: F[List[String]] = diff --git a/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala b/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala index ac8d4ab09..f17a8febd 100644 --- a/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala +++ b/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala @@ -60,10 +60,10 @@ class TopicDeletionProgramSpec extends AnyFlatSpec with Matchers { override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): F[Unit] = underlying.deleteSchemaOfVersion(subject, version) - override def getVersion(subject: String, schema: Schema): F[SchemaVersion] = + override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean = false): F[SchemaVersion] = underlying.getVersion(subject, schema) - override def getAllVersions(subject: String): F[List[SchemaId]] = + override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): F[List[SchemaId]] = underlying.getAllVersions(subject) override def getAllSubjects: F[List[String]] = diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala b/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala index c34b21c25..2f7647abd 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala @@ -44,12 +44,12 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr val suffixedSubject = subject.value + (if (isKey) "-key" else "-value") val registerSchema: F[Option[SchemaVersion]] = { schemaRegistry - .getVersion(suffixedSubject, schema) + .getVersion(suffixedSubject, schema, useExponentialBackoffRetryPolicy = true) .attempt .map(_.toOption) .flatMap { previousSchemaVersion => schemaRegistry.registerSchema(suffixedSubject, schema) *> - schemaRegistry.getVersion(suffixedSubject, schema).map { + schemaRegistry.getVersion(suffixedSubject, schema, useExponentialBackoffRetryPolicy = true).map { newSchemaVersion => if (previousSchemaVersion.contains(newSchemaVersion)) { None diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala index 1bd207a9f..358008c35 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala @@ -268,9 +268,10 @@ final class BootstrapEndpointV2Spec ): IO[Unit] = err override def getVersion( subject: String, - schema: Schema + schema: Schema, + useExponentialBackoffRetryPolicy: Boolean = false ): IO[SchemaVersion] = err - override def getAllVersions(subject: String): IO[List[Int]] = err + override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): IO[List[Int]] = err override def getAllSubjects: IO[List[String]] = err override def getSchemaRegistryClient: IO[SchemaRegistryClient] = err diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala index ecff53481..e8c2e38e5 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala @@ -75,10 +75,10 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite { override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): IO[Unit] = ref.update(_.copy(deleteSchemaWasCalled = true)) - override def getVersion(subject: String, schema: Schema): IO[SchemaVersion] = + override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean = false): IO[SchemaVersion] = ref.get.map(testState => testState.numSchemasRegistered + 1) - override def getAllVersions(subject: String): IO[List[Int]] = IO.pure(List()) + override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): IO[List[Int]] = IO.pure(List()) override def getAllSubjects: IO[List[String]] = IO.pure(List()) override def getSchemaRegistryClient: IO[SchemaRegistryClient] = IO.raiseError(new Exception("Something horrible went wrong!")) @@ -104,8 +104,8 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite { ref.get.flatMap(n => ref.set(n + 1) *> IO.raiseError(new Exception("Something horrible went wrong!"))) override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): IO[Unit] = IO.unit - override def getVersion(subject: String, schema: Schema): IO[SchemaVersion] = IO.pure(1) - override def getAllVersions(subject: String): IO[List[Int]] = IO.pure(Nil) + override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean = false): IO[SchemaVersion] = IO.pure(1) + override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): IO[List[Int]] = IO.pure(Nil) override def getAllSubjects: IO[List[String]] = IO.pure(Nil) override def getSchemaRegistryClient: IO[SchemaRegistryClient] = IO.raiseError(new Exception("Something horrible went wrong!")) override def getLatestSchemaBySubject(subject: String): IO[Option[Schema]] = IO.pure(None) @@ -135,8 +135,8 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite { override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): IO[Unit] = ref.update(ts => ts.copy(schemas = ts.schemas - subject)) - override def getVersion(subject: String, schema: Schema): IO[SchemaVersion] = ref.get.map(_.schemas(subject)) - override def getAllVersions(subject: String): IO[List[Int]] = IO.pure(Nil) + override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean = false): IO[SchemaVersion] = ref.get.map(_.schemas(subject)) + override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): IO[List[Int]] = IO.pure(Nil) override def getAllSubjects: IO[List[String]] = IO.pure(Nil) override def getSchemaRegistryClient: IO[SchemaRegistryClient] = IO.raiseError(new Exception) override def getLatestSchemaBySubject(subject: String): IO[Option[Schema]] = IO.pure(None)