Skip to content

Commit

Permalink
ADAPT1-1599 | Enhance retry policy control by adding function-level r…
Browse files Browse the repository at this point in the history
…etry configuration
  • Loading branch information
aadit-chugh committed Jul 11, 2024
1 parent 8c8854e commit ba556ac
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 41 deletions.
38 changes: 19 additions & 19 deletions avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ trait SchemaRegistry[F[_]] {
* @param schema - avro Schema which is expected to be in Schema Registry
* @return SchemaVersion
*/
def getVersion(subject: String, schema: Schema): F[SchemaVersion]
def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean = false): F[SchemaVersion]

/**
* Retrieves all SchemaVersion(s) for a given subject.
*
* @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value)
* @return List[SchemaVersion] or List.empty if Subject Not Found
*/
def getAllVersions(subject: String): F[List[SchemaVersion]]
def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): F[List[SchemaVersion]]

/**
* Retrieves all subjects found in the SchemaRegistry
Expand Down Expand Up @@ -157,11 +157,10 @@ object SchemaRegistry {
maxCacheSize: Int,
securityConfig: SchemaRegistrySecurityConfig,
schemaRegistryClientRetries: Int,
schemaRegistryClientRetriesDelay: FiniteDuration,
useExponentialBackoff: Boolean
schemaRegistryClientRetriesDelay: FiniteDuration
): F[SchemaRegistry[F]] = Sync[F].delay {
getFromSchemaRegistryClient(new CachedSchemaRegistryClient(schemaRegistryBaseUrl, maxCacheSize,
securityConfig.toConfigMap.asJava), schemaRegistryClientRetries, schemaRegistryClientRetriesDelay, useExponentialBackoff)
securityConfig.toConfigMap.asJava), schemaRegistryClientRetries, schemaRegistryClientRetriesDelay)
}

// scalastyle:off parameter.number
Expand All @@ -175,8 +174,7 @@ object SchemaRegistry {
schemaCacheTtl: Int,
versionCacheTtl: Int,
schemaRegistryClientRetries: Int,
schemaRegistryClientRetriesDelay: FiniteDuration,
useExponentialBackoff: Boolean
schemaRegistryClientRetriesDelay: FiniteDuration
): F[SchemaRegistry[F]] = Sync[F].delay {
getFromSchemaRegistryClient(
new RedisSchemaRegistryClient(
Expand All @@ -188,8 +186,7 @@ object SchemaRegistry {
ssl
),
schemaRegistryClientRetries,
schemaRegistryClientRetriesDelay,
useExponentialBackoff
schemaRegistryClientRetriesDelay
)
}

Expand All @@ -205,14 +202,12 @@ object SchemaRegistry {
// scalastyle:off method.length
private def getFromSchemaRegistryClient[F[_] : Sync : Logger : Sleep](schemaRegistryClient: SchemaRegistryClient,
schemaRegistryClientRetries: Int,
schemaRegistryClientRetriesDelay: FiniteDuration,
useExponentialBackoff: Boolean = false): SchemaRegistry[F] =
schemaRegistryClientRetriesDelay: FiniteDuration): SchemaRegistry[F] =
new SchemaRegistry[F] {
val retryPolicy = if (useExponentialBackoff) {
limitRetries(schemaRegistryClientRetries) |+| exponentialBackoff[F](schemaRegistryClientRetriesDelay)
} else {
limitRetries(schemaRegistryClientRetries) |+| constantDelay[F](schemaRegistryClientRetriesDelay)
}
lazy val constantDelayRetryPolicy = limitRetries(schemaRegistryClientRetries) |+| constantDelay[F](schemaRegistryClientRetriesDelay)

lazy val exponentialBackOffRetryPolicy = limitRetries(schemaRegistryClientRetries) |+| exponentialBackoff[F](schemaRegistryClientRetriesDelay)


private implicit class SchemaOps(sch: Schema) {
def fields: List[Schema.Field] = fieldsEval("topLevel", box = false).value
Expand Down Expand Up @@ -300,8 +295,10 @@ object SchemaRegistry {

override def getVersion(
subject: String,
schema: Schema
): F[SchemaVersion] =
schema: Schema,
useExponentialBackoffRetryPolicy: Boolean = false
): F[SchemaVersion] = {
val retryPolicy = if(useExponentialBackoffRetryPolicy) exponentialBackOffRetryPolicy else constantDelayRetryPolicy
Sync[F].delay {
schemaRegistryClient.getVersion(subject, schema)
}.retryingOnSomeErrors(
Expand All @@ -312,12 +309,15 @@ object SchemaRegistry {
policy = retryPolicy,
onError = onFailure("getVersion", subject)
)
}

override def getAllVersions(subject: String): F[List[SchemaId]] =
override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): F[List[SchemaId]] = {
val retryPolicy = if(useExponentialBackoffRetryPolicy) exponentialBackOffRetryPolicy else constantDelayRetryPolicy
Sync[F].fromTry(Try(schemaRegistryClient.getAllVersions(subject)))
.map(_.asScala.toList.map(_.toInt)).recover {
case r: RestClientException if r.getErrorCode == 40401 => List.empty
}.retryingOnAllErrors(retryPolicy, onFailure("getAllVersions", subject))
}

private def onFailure(resourceTried: String, subject: String): (Throwable, RetryDetails) => F[Unit] =
(error, retryDetails) =>
Expand Down
6 changes: 2 additions & 4 deletions ingest/src/main/scala/hydra.ingest/app/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ object AppConfig {
fullUrl: String,
maxCacheSize: Int,
schemaRegistryClientRetriesConfig: Int,
schemaRegistryClientRetrieDelaysConfig: FiniteDuration,
useExponentialBackoff: Boolean
schemaRegistryClientRetrieDelaysConfig: FiniteDuration
)

final case class SchemaRegistryRedisConfig(
Expand All @@ -40,8 +39,7 @@ object AppConfig {
.default("http://localhost:8081"),
env("HYDRA_MAX_SCHEMAS_PER_SUBJECT").as[Int].default(1000),
env("HYDRA_SCHEMA_REGISTRY_RETRIES").as[Int].default(3),
env("HYDRA_SCHEMA_REGISTRY_RETRIES_DELAY").as[FiniteDuration].default(500.milliseconds),
env("HYDRA_SCHEMA_REGISTRY_USE_EXPONENTIAL_BACKOFF").as[Boolean].default(false)
env("HYDRA_SCHEMA_REGISTRY_RETRIES_DELAY").as[FiniteDuration].default(500.milliseconds)
).parMapN(SchemaRegistryConfig)

private val schemaRegistryRedisConfig: ConfigValue[SchemaRegistryRedisConfig] = (
Expand Down
6 changes: 2 additions & 4 deletions ingest/src/main/scala/hydra.ingest/modules/Algebras.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,15 @@ object Algebras {
config.schemaRegistryRedisConfig.schemaCacheTtl,
config.schemaRegistryRedisConfig.versionCacheTtl,
config.createTopicConfig.schemaRegistryConfig.schemaRegistryClientRetriesConfig,
config.createTopicConfig.schemaRegistryConfig.schemaRegistryClientRetrieDelaysConfig,
config.createTopicConfig.schemaRegistryConfig.useExponentialBackoff
config.createTopicConfig.schemaRegistryConfig.schemaRegistryClientRetrieDelaysConfig
)
} else {
SchemaRegistry.live[F](
schemaRegistryUrl,
config.createTopicConfig.schemaRegistryConfig.maxCacheSize,
config.schemaRegistrySecurityConfig,
config.createTopicConfig.schemaRegistryConfig.schemaRegistryClientRetriesConfig,
config.createTopicConfig.schemaRegistryConfig.schemaRegistryClientRetrieDelaysConfig,
config.createTopicConfig.schemaRegistryConfig.useExponentialBackoff
config.createTopicConfig.schemaRegistryConfig.schemaRegistryClientRetrieDelaysConfig
)
}
kafkaAdmin <- KafkaAdminAlgebra.live[F](config.createTopicConfig.bootstrapServers, kafkaClientSecurityConfig = config.kafkaClientSecurityConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ class TopicDeletionEndpointSpec extends Matchers with AnyWordSpecLike with Scala
override def deleteSchemaOfVersion(subject: String,version: SchemaVersion): F[Unit] =
underlying.deleteSchemaOfVersion(subject,version)

override def getVersion(subject: String,schema: Schema): F[SchemaVersion] =
override def getVersion(subject: String,schema: Schema, useExponentialBackoffRetryPolicy: Boolean = false): F[SchemaVersion] =
underlying.getVersion(subject,schema)

override def getAllVersions(subject: String): F[List[SchemaId]] =
override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): F[List[SchemaId]] =
underlying.getAllVersions(subject)

override def getAllSubjects: F[List[String]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ class TopicDeletionProgramSpec extends AnyFlatSpec with Matchers {
override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): F[Unit] =
underlying.deleteSchemaOfVersion(subject, version)

override def getVersion(subject: String, schema: Schema): F[SchemaVersion] =
override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean = false): F[SchemaVersion] =
underlying.getVersion(subject, schema)

override def getAllVersions(subject: String): F[List[SchemaId]] =
override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): F[List[SchemaId]] =
underlying.getAllVersions(subject)

override def getAllSubjects: F[List[String]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr
val suffixedSubject = subject.value + (if (isKey) "-key" else "-value")
val registerSchema: F[Option[SchemaVersion]] = {
schemaRegistry
.getVersion(suffixedSubject, schema)
.getVersion(suffixedSubject, schema, useExponentialBackoffRetryPolicy = true)
.attempt
.map(_.toOption)
.flatMap { previousSchemaVersion =>
schemaRegistry.registerSchema(suffixedSubject, schema) *>
schemaRegistry.getVersion(suffixedSubject, schema).map {
schemaRegistry.getVersion(suffixedSubject, schema, useExponentialBackoffRetryPolicy = true).map {
newSchemaVersion =>
if (previousSchemaVersion.contains(newSchemaVersion)) {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,10 @@ final class BootstrapEndpointV2Spec
): IO[Unit] = err
override def getVersion(
subject: String,
schema: Schema
schema: Schema,
useExponentialBackoffRetryPolicy: Boolean = false
): IO[SchemaVersion] = err
override def getAllVersions(subject: String): IO[List[Int]] = err
override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): IO[List[Int]] = err
override def getAllSubjects: IO[List[String]] = err

override def getSchemaRegistryClient: IO[SchemaRegistryClient] = err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite {
override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): IO[Unit] =
ref.update(_.copy(deleteSchemaWasCalled = true))

override def getVersion(subject: String, schema: Schema): IO[SchemaVersion] =
override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean = false): IO[SchemaVersion] =
ref.get.map(testState => testState.numSchemasRegistered + 1)

override def getAllVersions(subject: String): IO[List[Int]] = IO.pure(List())
override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): IO[List[Int]] = IO.pure(List())
override def getAllSubjects: IO[List[String]] = IO.pure(List())
override def getSchemaRegistryClient: IO[SchemaRegistryClient] =
IO.raiseError(new Exception("Something horrible went wrong!"))
Expand All @@ -104,8 +104,8 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite {
ref.get.flatMap(n => ref.set(n + 1) *> IO.raiseError(new Exception("Something horrible went wrong!")))

override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): IO[Unit] = IO.unit
override def getVersion(subject: String, schema: Schema): IO[SchemaVersion] = IO.pure(1)
override def getAllVersions(subject: String): IO[List[Int]] = IO.pure(Nil)
override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean = false): IO[SchemaVersion] = IO.pure(1)
override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): IO[List[Int]] = IO.pure(Nil)
override def getAllSubjects: IO[List[String]] = IO.pure(Nil)
override def getSchemaRegistryClient: IO[SchemaRegistryClient] = IO.raiseError(new Exception("Something horrible went wrong!"))
override def getLatestSchemaBySubject(subject: String): IO[Option[Schema]] = IO.pure(None)
Expand Down Expand Up @@ -135,8 +135,8 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite {
override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): IO[Unit] =
ref.update(ts => ts.copy(schemas = ts.schemas - subject))

override def getVersion(subject: String, schema: Schema): IO[SchemaVersion] = ref.get.map(_.schemas(subject))
override def getAllVersions(subject: String): IO[List[Int]] = IO.pure(Nil)
override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean = false): IO[SchemaVersion] = ref.get.map(_.schemas(subject))
override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): IO[List[Int]] = IO.pure(Nil)
override def getAllSubjects: IO[List[String]] = IO.pure(Nil)
override def getSchemaRegistryClient: IO[SchemaRegistryClient] = IO.raiseError(new Exception)
override def getLatestSchemaBySubject(subject: String): IO[Option[Schema]] = IO.pure(None)
Expand Down

0 comments on commit ba556ac

Please sign in to comment.