Skip to content

Commit

Permalink
Merge branch 'master' of gitlab.com:pluralsight/Technology/adapt/data…
Browse files Browse the repository at this point in the history
…-platform/hydra into ADAPT1-1206-Contact-Field-Validation
  • Loading branch information
aadit-chugh committed Aug 2, 2024
2 parents 0ab6def + 8e3ad7d commit 0edd0e7
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 24 deletions.
17 changes: 11 additions & 6 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@ include:
ref: main
file: 'snyk-check.yml'

image: openjdk:8
image: harbor.vnerd.com/library/hydra-build-tools:latest

variables:
ARTIFACT_NAME: hydra-publish
IMAGE_NAME: hydra
DOCKER_REGISTRY_URL: harbor.vnerd.com/library
DOCKER_REGISTRY_IMAGE: ${DOCKER_REGISTRY_URL}/${IMAGE_NAME}
ARTIFACTORY_REPOSITORY_URL: https://repository.vnerd.com/artifactory
ARTIFACTORY_REPOSITORY: bounded-context
ARTIFACTORY_ID: hydra/publish
ARTIFACTORY_ARTIFACT_VERSION: ${ARTIFACT_NAME}-${BUILD_VERSION}.tgz
ARTIFACTORY_PATH: ${ARTIFACTORY_ID}/${ARTIFACTORY_ARTIFACT_VERSION}
ARTIFACTORY_ARTIFACT_URL: ${ARTIFACTORY_REPOSITORY_URL}/${ARTIFACTORY_REPOSITORY}/${ARTIFACTORY_PATH}
ARTIFACTORY_ARTIFACT_URL: ${ARTIFACTORY_SAAS_REPOSITORY_URL}/${ARTIFACTORY_REPOSITORY}/${ARTIFACTORY_PATH}
BUILD_VERSION: 1.0.${CI_PIPELINE_IID}
BOUNDED_CONTEXT_DEV: adapt-dvs-dev
BOUNDED_CONTEXT_STAGING: adapt-dvs
Expand Down Expand Up @@ -84,9 +83,15 @@ package:
- echo "##teamcity[publishArtifacts '${ARTIFACT_NAME}-${BUILD_VERSION}.tgz']"
- md5sum ${ARTIFACT_NAME}-${BUILD_VERSION}.tgz > ${ARTIFACT_NAME}-${BUILD_VERSION}.tgz.md5
- echo "${ARTIFACTORY_ARTIFACT_URL}"
- CHECKSUM=$(md5sum ${ARTIFACT_NAME}-${BUILD_VERSION}.tgz | awk '{ print $1 }')
- CHECKSUM_SHA1=$(shasum -a 1 ${ARTIFACT_NAME}-${BUILD_VERSION}.tgz | awk '{ print $1 }')
- CHECKSUM_SHA256=$(shasum -a 256 ${ARTIFACT_NAME}-${BUILD_VERSION}.tgz | awk '{ print $1 }')
- |
curl \
-H "Authorization: Bearer ${NPM_TOKEN}" \
-H "Authorization: Bearer ${ARTIFACTORY_REPOSITORY_TOKEN}" \
-H "X-Checksum-MD5:${CHECKSUM}" \
-H "X-Checksum-Sha1:${CHECKSUM_SHA1}" \
-H "X-Checksum-Sha256:${CHECKSUM_SHA256}" \
-X PUT ${ARTIFACTORY_ARTIFACT_URL} \
-T ${ARTIFACTORY_ARTIFACT_VERSION}
artifacts:
Expand Down Expand Up @@ -186,7 +191,7 @@ deploy_staging:
SALT_USERNAME: tcity-data-platform
SALT_KWARGS: 'failhard=true'
SALT_PILLAR: $CI_PROJECT_DIR/pillar_overrides.yaml
SALT_URL: https://saltmaster-stage.vnerd.com:8000
SALT_URL: ${SALT_URL_STAGING}

slack:staging:
stage: notify
Expand Down Expand Up @@ -217,7 +222,7 @@ deploy_production:
SALT_USERNAME: tcity-data-platform
SALT_KWARGS: 'failhard=true'
SALT_PILLAR: $CI_PROJECT_DIR/pillar_overrides.yaml
SALT_URL: https://saltmaster-production.vnerd.com:8000
SALT_URL: ${SALT_URL_PROD}

slack:production:
stage: notify
Expand Down
24 changes: 18 additions & 6 deletions avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,21 @@ trait SchemaRegistry[F[_]] {
*
* @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value)
* @param schema - avro Schema which is expected to be in Schema Registry
* @param useExponentialBackoffRetryPolicy - flag indicating whether to use exponential backoff retry policy
* (default: false, meaning constant delay retry policy is used)
* @return SchemaVersion
*/
def getVersion(subject: String, schema: Schema): F[SchemaVersion]
def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean = false): F[SchemaVersion]

/**
* Retrieves all SchemaVersion(s) for a given subject.
*
* @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value)
* @param useExponentialBackoffRetryPolicy - flag indicating whether to use exponential backoff retry policy
* (default: false, meaning constant delay retry policy is used)
* @return List[SchemaVersion] or List.empty if Subject Not Found
*/
def getAllVersions(subject: String): F[List[SchemaVersion]]
def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): F[List[SchemaVersion]]

/**
* Retrieves all subjects found in the SchemaRegistry
Expand Down Expand Up @@ -204,7 +208,10 @@ object SchemaRegistry {
schemaRegistryClientRetries: Int,
schemaRegistryClientRetriesDelay: FiniteDuration): SchemaRegistry[F] =
new SchemaRegistry[F] {
val retryPolicy = limitRetries(schemaRegistryClientRetries) |+| exponentialBackoff[F](schemaRegistryClientRetriesDelay)
lazy val constantDelayRetryPolicy = limitRetries(schemaRegistryClientRetries) |+| constantDelay[F](schemaRegistryClientRetriesDelay)

lazy val exponentialBackOffRetryPolicy = limitRetries(schemaRegistryClientRetries) |+| exponentialBackoff[F](schemaRegistryClientRetriesDelay)


private implicit class SchemaOps(sch: Schema) {
def fields: List[Schema.Field] = fieldsEval("topLevel", box = false).value
Expand Down Expand Up @@ -292,8 +299,10 @@ object SchemaRegistry {

override def getVersion(
subject: String,
schema: Schema
): F[SchemaVersion] =
schema: Schema,
useExponentialBackoffRetryPolicy: Boolean = false
): F[SchemaVersion] = {
val retryPolicy = if(useExponentialBackoffRetryPolicy) exponentialBackOffRetryPolicy else constantDelayRetryPolicy
Sync[F].delay {
schemaRegistryClient.getVersion(subject, schema)
}.retryingOnSomeErrors(
Expand All @@ -304,12 +313,15 @@ object SchemaRegistry {
policy = retryPolicy,
onError = onFailure("getVersion", subject)
)
}

override def getAllVersions(subject: String): F[List[SchemaId]] =
override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean = false): F[List[SchemaId]] = {
val retryPolicy = if(useExponentialBackoffRetryPolicy) exponentialBackOffRetryPolicy else constantDelayRetryPolicy
Sync[F].fromTry(Try(schemaRegistryClient.getAllVersions(subject)))
.map(_.asScala.toList.map(_.toInt)).recover {
case r: RestClientException if r.getErrorCode == 40401 => List.empty
}.retryingOnAllErrors(retryPolicy, onFailure("getAllVersions", subject))
}

private def onFailure(resourceTried: String, subject: String): (Throwable, RetryDetails) => F[Unit] =
(error, retryDetails) =>
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ lazy val defaultSettings = Seq(
"-Dorg.aspectj.tracing.factory=default",
"-J" + jvmMaxMemoryFlag
)

)

lazy val restartSettings = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ class TopicDeletionEndpointSpec extends Matchers with AnyWordSpecLike with Scala
override def deleteSchemaOfVersion(subject: String,version: SchemaVersion): F[Unit] =
underlying.deleteSchemaOfVersion(subject,version)

override def getVersion(subject: String,schema: Schema): F[SchemaVersion] =
override def getVersion(subject: String,schema: Schema, useExponentialBackoffRetryPolicy: Boolean): F[SchemaVersion] =
underlying.getVersion(subject,schema)

override def getAllVersions(subject: String): F[List[SchemaId]] =
override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean): F[List[SchemaId]] =
underlying.getAllVersions(subject)

override def getAllSubjects: F[List[String]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ class TopicDeletionProgramSpec extends AnyFlatSpec with Matchers {
override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): F[Unit] =
underlying.deleteSchemaOfVersion(subject, version)

override def getVersion(subject: String, schema: Schema): F[SchemaVersion] =
override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean): F[SchemaVersion] =
underlying.getVersion(subject, schema)

override def getAllVersions(subject: String): F[List[SchemaId]] =
override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean): F[List[SchemaId]] =
underlying.getAllVersions(subject)

override def getAllSubjects: F[List[String]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,10 @@ final class BootstrapEndpointV2Spec
): IO[Unit] = err
override def getVersion(
subject: String,
schema: Schema
schema: Schema,
useExponentialBackoffRetryPolicy: Boolean
): IO[SchemaVersion] = err
override def getAllVersions(subject: String): IO[List[Int]] = err
override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean): IO[List[Int]] = err
override def getAllSubjects: IO[List[String]] = err

override def getSchemaRegistryClient: IO[SchemaRegistryClient] = err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite {
override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): IO[Unit] =
ref.update(_.copy(deleteSchemaWasCalled = true))

override def getVersion(subject: String, schema: Schema): IO[SchemaVersion] =
override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean): IO[SchemaVersion] =
ref.get.map(testState => testState.numSchemasRegistered + 1)

override def getAllVersions(subject: String): IO[List[Int]] = IO.pure(List())
override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean): IO[List[Int]] = IO.pure(List())
override def getAllSubjects: IO[List[String]] = IO.pure(List())
override def getSchemaRegistryClient: IO[SchemaRegistryClient] =
IO.raiseError(new Exception("Something horrible went wrong!"))
Expand All @@ -106,8 +106,8 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite {
ref.get.flatMap(n => ref.set(n + 1) *> IO.raiseError(new Exception("Something horrible went wrong!")))

override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): IO[Unit] = IO.unit
override def getVersion(subject: String, schema: Schema): IO[SchemaVersion] = IO.pure(1)
override def getAllVersions(subject: String): IO[List[Int]] = IO.pure(Nil)
override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean): IO[SchemaVersion] = IO.pure(1)
override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean): IO[List[Int]] = IO.pure(Nil)
override def getAllSubjects: IO[List[String]] = IO.pure(Nil)
override def getSchemaRegistryClient: IO[SchemaRegistryClient] = IO.raiseError(new Exception("Something horrible went wrong!"))
override def getLatestSchemaBySubject(subject: String): IO[Option[Schema]] = IO.pure(None)
Expand Down Expand Up @@ -137,8 +137,8 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite {
override def deleteSchemaOfVersion(subject: String, version: SchemaVersion): IO[Unit] =
ref.update(ts => ts.copy(schemas = ts.schemas - subject))

override def getVersion(subject: String, schema: Schema): IO[SchemaVersion] = ref.get.map(_.schemas(subject))
override def getAllVersions(subject: String): IO[List[Int]] = IO.pure(Nil)
override def getVersion(subject: String, schema: Schema, useExponentialBackoffRetryPolicy: Boolean): IO[SchemaVersion] = ref.get.map(_.schemas(subject))
override def getAllVersions(subject: String, useExponentialBackoffRetryPolicy: Boolean): IO[List[Int]] = IO.pure(Nil)
override def getAllSubjects: IO[List[String]] = IO.pure(Nil)
override def getSchemaRegistryClient: IO[SchemaRegistryClient] = IO.raiseError(new Exception)
override def getLatestSchemaBySubject(subject: String): IO[Option[Schema]] = IO.pure(None)
Expand Down

0 comments on commit 0edd0e7

Please sign in to comment.