diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f58d659bd..bec501896 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -7,19 +7,18 @@ include: ref: main file: 'snyk-check.yml' -image: openjdk:8 +image: harbor.vnerd.com/library/hydra-build-tools:latest variables: ARTIFACT_NAME: hydra-publish IMAGE_NAME: hydra DOCKER_REGISTRY_URL: harbor.vnerd.com/library DOCKER_REGISTRY_IMAGE: ${DOCKER_REGISTRY_URL}/${IMAGE_NAME} - ARTIFACTORY_REPOSITORY_URL: https://repository.vnerd.com/artifactory ARTIFACTORY_REPOSITORY: bounded-context ARTIFACTORY_ID: hydra/publish ARTIFACTORY_ARTIFACT_VERSION: ${ARTIFACT_NAME}-${BUILD_VERSION}.tgz ARTIFACTORY_PATH: ${ARTIFACTORY_ID}/${ARTIFACTORY_ARTIFACT_VERSION} - ARTIFACTORY_ARTIFACT_URL: ${ARTIFACTORY_REPOSITORY_URL}/${ARTIFACTORY_REPOSITORY}/${ARTIFACTORY_PATH} + ARTIFACTORY_ARTIFACT_URL: ${ARTIFACTORY_SAAS_REPOSITORY_URL}/${ARTIFACTORY_REPOSITORY}/${ARTIFACTORY_PATH} BUILD_VERSION: 1.0.${CI_PIPELINE_IID} BOUNDED_CONTEXT_DEV: adapt-dvs-dev BOUNDED_CONTEXT_STAGING: adapt-dvs @@ -84,9 +83,15 @@ package: - echo "##teamcity[publishArtifacts '${ARTIFACT_NAME}-${BUILD_VERSION}.tgz']" - md5sum ${ARTIFACT_NAME}-${BUILD_VERSION}.tgz > ${ARTIFACT_NAME}-${BUILD_VERSION}.tgz.md5 - echo "${ARTIFACTORY_ARTIFACT_URL}" + - CHECKSUM=$(md5sum ${ARTIFACT_NAME}-${BUILD_VERSION}.tgz | awk '{ print $1 }') + - CHECKSUM_SHA1=$(shasum -a 1 ${ARTIFACT_NAME}-${BUILD_VERSION}.tgz | awk '{ print $1 }') + - CHECKSUM_SHA256=$(shasum -a 256 ${ARTIFACT_NAME}-${BUILD_VERSION}.tgz | awk '{ print $1 }') - | curl \ - -H "Authorization: Bearer ${NPM_TOKEN}" \ + -H "Authorization: Bearer ${ARTIFACTORY_REPOSITORY_TOKEN}" \ + -H "X-Checksum-MD5:${CHECKSUM}" \ + -H "X-Checksum-Sha1:${CHECKSUM_SHA1}" \ + -H "X-Checksum-Sha256:${CHECKSUM_SHA256}" \ -X PUT ${ARTIFACTORY_ARTIFACT_URL} \ -T ${ARTIFACTORY_ARTIFACT_VERSION} artifacts: @@ -186,7 +191,7 @@ deploy_staging: SALT_USERNAME: tcity-data-platform SALT_KWARGS: 'failhard=true' SALT_PILLAR: $CI_PROJECT_DIR/pillar_overrides.yaml - SALT_URL: https://saltmaster-stage.vnerd.com:8000 + SALT_URL: ${SALT_URL_STAGING} slack:staging: stage: notify @@ -217,7 +222,7 @@ deploy_production: SALT_USERNAME: tcity-data-platform SALT_KWARGS: 'failhard=true' SALT_PILLAR: $CI_PROJECT_DIR/pillar_overrides.yaml - SALT_URL: https://saltmaster-production.vnerd.com:8000 + SALT_URL: ${SALT_URL_PROD} slack:production: stage: notify diff --git a/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala b/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala index cabb40673..788d4f61c 100644 --- a/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala +++ b/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala @@ -64,17 +64,21 @@ trait SchemaRegistry[F[_]] { * * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) * @param schema - avro Schema which is expected to be in Schema Registry + * @param useExponentialBackoffRetryPolicy - flag indicating whether to use exponential backoff retry policy + * (default: false, meaning constant delay retry policy is used) * @return SchemaVersion */ - def getVersion(subject: String, schema: Schema): F[SchemaVersion] + def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean = false): F[SchemaVersion] /** * Retrieves all SchemaVersion(s) for a given subject. * * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) + * @param useExponentialBackoffRetryPolicy - flag indicating whether to use exponential backoff retry policy + * (default: false, meaning constant delay retry policy is used) * @return List[SchemaVersion] or List.empty if Subject Not Found */ - def getAllVersions(subject: String): F[List[SchemaVersion]] + def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): F[List[SchemaVersion]] /** * Retrieves all subjects found in the SchemaRegistry @@ -204,7 +208,10 @@ object SchemaRegistry { schemaRegistryClientRetries: Int, schemaRegistryClientRetriesDelay: FiniteDuration): SchemaRegistry[F] = new SchemaRegistry[F] { - val retryPolicy = limitRetries(schemaRegistryClientRetries) |+| exponentialBackoff[F](schemaRegistryClientRetriesDelay) + lazy val constantDelayRetryPolicy = limitRetries(schemaRegistryClientRetries) |+| constantDelay[F](schemaRegistryClientRetriesDelay) + + lazy val exponentialBackOffRetryPolicy = limitRetries(schemaRegistryClientRetries) |+| exponentialBackoff[F](schemaRegistryClientRetriesDelay) + private implicit class SchemaOps(sch: Schema) { def fields: List[Schema.Field] = fieldsEval("topLevel", box = false).value @@ -292,8 +299,10 @@ object SchemaRegistry { override def getVersion( subject: String, - schema: Schema - ): F[SchemaVersion] = + schema: Schema, + useExponentialBackoffRetryPolicy: Boolean = false + ): F[SchemaVersion] = { + val retryPolicy = if(useExponentialBackoffRetryPolicy) exponentialBackOffRetryPolicy else constantDelayRetryPolicy Sync[F].delay { schemaRegistryClient.getVersion(subject, schema) }.retryingOnSomeErrors( @@ -304,12 +313,15 @@ object SchemaRegistry { policy = retryPolicy, onError = onFailure("getVersion", subject) ) + } - override def getAllVersions(subject: String): F[List[SchemaId]] = + override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): F[List[SchemaId]] = { + val retryPolicy = if(useExponentialBackoffRetryPolicy) exponentialBackOffRetryPolicy else constantDelayRetryPolicy Sync[F].fromTry(Try(schemaRegistryClient.getAllVersions(subject))) .map(_.asScala.toList.map(_.toInt)).recover { case r: RestClientException if r.getErrorCode == 40401 => List.empty }.retryingOnAllErrors(retryPolicy, onFailure("getAllVersions", subject)) + } private def onFailure(resourceTried: String, subject: String): (Throwable, RetryDetails) => F[Unit] = (error, retryDetails) => diff --git a/build.sbt b/build.sbt index 44943fa47..507258a97 100644 --- a/build.sbt +++ b/build.sbt @@ -61,6 +61,7 @@ lazy val defaultSettings = Seq( "-Dorg.aspectj.tracing.factory=default", "-J" + jvmMaxMemoryFlag ) + ) lazy val restartSettings = Seq( diff --git a/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala b/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala index b6d29137a..248112e55 100644 --- a/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala +++ b/ingest/src/test/scala/hydra/ingest/http/TopicDeletionEndpointSpec.scala @@ -64,10 +64,10 @@ class TopicDeletionEndpointSpec extends Matchers with AnyWordSpecLike with Scala override def deleteSchemaOfVersion(subject: String,version: SchemaVersion): F[Unit] = underlying.deleteSchemaOfVersion(subject,version) - override def getVersion(subject: String,schema: Schema): F[SchemaVersion] = + override def getVersion(subject: String,schema: Schema, useExponentialBackoffRetryPolicy: Boolean): F[SchemaVersion] = underlying.getVersion(subject,schema) - override def getAllVersions(subject: String): F[List[SchemaId]] = + override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean): F[List[SchemaId]] = underlying.getAllVersions(subject) override def getAllSubjects: F[List[String]] = diff --git a/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala b/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala index ac8d4ab09..42ae6fc66 100644 --- a/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala +++ b/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala @@ -60,10 +60,10 @@ class TopicDeletionProgramSpec extends AnyFlatSpec with Matchers { override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): F[Unit] = underlying.deleteSchemaOfVersion(subject, version) - override def getVersion(subject: String, schema: Schema): F[SchemaVersion] = + override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean): F[SchemaVersion] = underlying.getVersion(subject, schema) - override def getAllVersions(subject: String): F[List[SchemaId]] = + override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean): F[List[SchemaId]] = underlying.getAllVersions(subject) override def getAllSubjects: F[List[String]] = diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala index 7f088ff0c..8ca7f98dd 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala @@ -284,9 +284,10 @@ final class BootstrapEndpointV2Spec ): IO[Unit] = err override def getVersion( subject: String, - schema: Schema + schema: Schema, + useExponentialBackoffRetryPolicy: Boolean ): IO[SchemaVersion] = err - override def getAllVersions(subject: String): IO[List[Int]] = err + override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean): IO[List[Int]] = err override def getAllSubjects: IO[List[String]] = err override def getSchemaRegistryClient: IO[SchemaRegistryClient] = err diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala index 054d3f200..3d01a57c5 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala @@ -77,10 +77,10 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite { override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): IO[Unit] = ref.update(_.copy(deleteSchemaWasCalled = true)) - override def getVersion(subject: String, schema: Schema): IO[SchemaVersion] = + override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean): IO[SchemaVersion] = ref.get.map(testState => testState.numSchemasRegistered + 1) - override def getAllVersions(subject: String): IO[List[Int]] = IO.pure(List()) + override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean): IO[List[Int]] = IO.pure(List()) override def getAllSubjects: IO[List[String]] = IO.pure(List()) override def getSchemaRegistryClient: IO[SchemaRegistryClient] = IO.raiseError(new Exception("Something horrible went wrong!")) @@ -106,8 +106,8 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite { ref.get.flatMap(n => ref.set(n + 1) *> IO.raiseError(new Exception("Something horrible went wrong!"))) override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): IO[Unit] = IO.unit - override def getVersion(subject: String, schema: Schema): IO[SchemaVersion] = IO.pure(1) - override def getAllVersions(subject: String): IO[List[Int]] = IO.pure(Nil) + override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean): IO[SchemaVersion] = IO.pure(1) + override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean): IO[List[Int]] = IO.pure(Nil) override def getAllSubjects: IO[List[String]] = IO.pure(Nil) override def getSchemaRegistryClient: IO[SchemaRegistryClient] = IO.raiseError(new Exception("Something horrible went wrong!")) override def getLatestSchemaBySubject(subject: String): IO[Option[Schema]] = IO.pure(None) @@ -137,8 +137,8 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite { override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): IO[Unit] = ref.update(ts => ts.copy(schemas = ts.schemas - subject)) - override def getVersion(subject: String, schema: Schema): IO[SchemaVersion] = ref.get.map(_.schemas(subject)) - override def getAllVersions(subject: String): IO[List[Int]] = IO.pure(Nil) + override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean): IO[SchemaVersion] = ref.get.map(_.schemas(subject)) + override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean): IO[List[Int]] = IO.pure(Nil) override def getAllSubjects: IO[List[String]] = IO.pure(Nil) override def getSchemaRegistryClient: IO[SchemaRegistryClient] = IO.raiseError(new Exception) override def getLatestSchemaBySubject(subject: String): IO[Option[Schema]] = IO.pure(None)