Skip to content

Commit

Permalink
Merge branch 'fix-schema-registry-upgrade' into 'master'
Browse files Browse the repository at this point in the history
ADAPT-1701: Upgrade schema-registry-client/kafka-client version

See merge request pluralsight/Technology/adapt/data-platform/hydra!2
  • Loading branch information
shreedhar-kc committed Jul 1, 2024
2 parents de33ec0 + e94f3a3 commit 1a2c134
Show file tree
Hide file tree
Showing 32 changed files with 425 additions and 341 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,8 @@ build.properties
#ensime http://ensime.github.io/
.ensime
.ensime_cache

.bsp

# ignore temp files and folders
__temp*
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object ConfluentSchemaRegistry extends LoggingAdapter {

private val cachedClients = CacheBuilder
.newBuilder()
.build(
.build[SchemaRegistryClientInfo, ConfluentSchemaRegistry](
new CacheLoader[SchemaRegistryClientInfo, ConfluentSchemaRegistry] {

def load(info: SchemaRegistryClientInfo): ConfluentSchemaRegistry = {
Expand Down
314 changes: 167 additions & 147 deletions avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala

Large diffs are not rendered by default.

203 changes: 112 additions & 91 deletions avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package hydra.avro.registry

import com.typesafe.config.ConfigFactory
import hydra.common.config.KafkaConfigUtils.SchemaRegistrySecurityConfig
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, MockSchemaRegistryClient}
import org.apache.avro.Schema
import org.scalatest.concurrent.ScalaFutures
Expand Down Expand Up @@ -52,7 +53,7 @@ class ConfluentSchemaRegistrySpec

override def beforeAll(): Unit = {
id =
ConfluentSchemaRegistry.mockRegistry.register(schema.getFullName, schema)
ConfluentSchemaRegistry.mockRegistry.register(schema.getFullName, new AvroSchema(schema))
}

describe("When creating a schema registry client") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import org.apache.avro.{Schema, SchemaBuilder}
import org.scalatest.BeforeAndAfterAll
import com.github.sebruck.EmbeddedRedis
import hydra.avro.registry.RedisSchemaRegistryClient.IntSchemaMetadataMapBinCodec
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaMetadata, SchemaRegistryClient}
import net.manub.embeddedkafka.schemaregistry.{EmbeddedKafka, EmbeddedKafkaConfig}
import io.github.embeddedkafka.schemaregistry.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import redis.embedded.RedisServer
Expand Down Expand Up @@ -60,15 +61,16 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with
val schema1: Schema = SchemaBuilder.record("Test1").fields()
.requiredString("testing1").endRecord()

val schema12: Schema = SchemaBuilder.record("Test12").fields()
.requiredString("testing1").endRecord()
val schema12: Schema = SchemaBuilder.record("Test1").fields()
.requiredString("testing1").optionalString("testing22").endRecord()

val schema2: Schema = SchemaBuilder.record("Test2").fields()
.requiredString("testing2").endRecord()

//register two schemas with different clients
val redisClientResult = redisClient.register(topicName1, schema1)
val cachedClientResult = cachedClient.register(topicName2 ,schema2)
val redisClientResult = redisClient.register(topicName1, new AvroSchema(schema1))
val cachedClientResult = cachedClient.register(topicName2 , new AvroSchema(schema2))


//test the getAllSubjectsById method
assert(redisClient.getAllSubjectsById(redisClientResult).contains(topicName1))
Expand All @@ -77,17 +79,17 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with
assert(cachedClient.getAllSubjectsById(cachedClientResult).contains(topicName2))

//test the getById method
redisClient.getById(redisClientResult) shouldBe schema1
cachedClient.getById(redisClientResult) shouldBe schema1
redisClient.getById(cachedClientResult) shouldBe schema2
cachedClient.getById(cachedClientResult) shouldBe schema2
redisClient.getSchemaById(redisClientResult) shouldBe new AvroSchema(schema1)
cachedClient.getSchemaById(redisClientResult) shouldBe new AvroSchema(schema1)
redisClient.getSchemaById(cachedClientResult) shouldBe new AvroSchema(schema2)
cachedClient.getSchemaById(cachedClientResult) shouldBe new AvroSchema(schema2)

//test the getBySubjectAndId method
Thread.sleep(3000)
assert(redisClient.getBySubjectAndId(topicName1, redisClientResult).equals(schema1))
assert(cachedClient.getBySubjectAndId(topicName1, redisClientResult).equals(schema1))
assert(redisClient.getBySubjectAndId(topicName2, cachedClientResult).equals(schema2))
assert(cachedClient.getBySubjectAndId(topicName2, cachedClientResult).equals(schema2))
assert(redisClient.getSchemaBySubjectAndId(topicName1, redisClientResult).equals(new AvroSchema(schema1)))
assert(cachedClient.getSchemaBySubjectAndId(topicName1, redisClientResult).equals(new AvroSchema(schema1)))
assert(redisClient.getSchemaBySubjectAndId(topicName2, cachedClientResult).equals(new AvroSchema(schema2)))
assert(cachedClient.getSchemaBySubjectAndId(topicName2, cachedClientResult).equals(new AvroSchema(schema2)))


//test the getAllSubjects method
Expand All @@ -109,10 +111,10 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with

//test the getId method
Thread.sleep(3000)
redisClient.getId(topicName1, schema1) shouldBe redisClientResult
cachedClient.getId(topicName1, schema1) shouldBe redisClientResult
redisClient.getId(topicName2, schema2) shouldBe cachedClientResult
cachedClient.getId(topicName2, schema2) shouldBe cachedClientResult
redisClient.getId(topicName1, new AvroSchema(schema1)) shouldBe redisClientResult
cachedClient.getId(topicName1, new AvroSchema(schema1)) shouldBe redisClientResult
redisClient.getId(topicName2, new AvroSchema(schema2)) shouldBe cachedClientResult
cachedClient.getId(topicName2, new AvroSchema(schema2)) shouldBe cachedClientResult

//test the getLatestSchemaMetadata method
val schemaMetadata1 = new SchemaMetadata(1, 1,schema1.toString)
Expand All @@ -123,13 +125,13 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with
schemaMetadata1Result.getVersion shouldBe schemaMetadata1.getVersion
schemaMetadata1Result.getSchema shouldBe schemaMetadata1.getSchema

redisClient.register(topicName1, schema12)
redisClient.register(topicName1, new AvroSchema(schema12))
val schemaMetadata2Result = redisClient.getLatestSchemaMetadata(topicName1)
schemaMetadata2Result.getId shouldBe schemaMetadata2.getId
schemaMetadata2Result.getVersion shouldBe schemaMetadata2.getVersion
schemaMetadata2Result.getSchema shouldBe schemaMetadata2.getSchema

//test the getSchemaMetadata method
//test the getSchemaMetadata method
val metadata1 = redisClient.getSchemaMetadata(topicName1, 1)
metadata1.getId shouldBe schemaMetadata1.getId
metadata1.getVersion shouldBe schemaMetadata1.getVersion
Expand All @@ -143,9 +145,9 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with

//test the getVersion method
Thread.sleep(3000)
redisClient.getVersion(topicName1, schema1) shouldBe 1
redisClient.getVersion(topicName1, schema12) shouldBe 2
redisClient.getVersion(topicName2, schema2) shouldBe 1
redisClient.getVersion(topicName1, new AvroSchema(schema1)) shouldBe 1
redisClient.getVersion(topicName1, new AvroSchema(schema12)) shouldBe 2
redisClient.getVersion(topicName2, new AvroSchema(schema2)) shouldBe 1

//test the deleteSchemaVersion method
//the latest metadata is in this test -> test the getLatestSchemaMetadata method
Expand All @@ -160,7 +162,7 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with

intercept[RestClientException] {
redisClient.getLatestSchemaMetadata(topicName2)
}.getMessage shouldBe "Subject not found.; error code: 40401"
}.getMessage shouldBe "Subject 'topicB' not found.; error code: 40401"

succeed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,12 @@ class SchemaRegistrySpec extends AnyFlatSpecLike with MockFactory with Matchers
it must "do retries for getAllVersions when SchemaRegistry throws error" in {
val expectedVersions = List(1, 2, 3)
val mockSchemaRegistryClient = mock[SchemaRegistryClient]
(mockSchemaRegistryClient.getAllVersions _)
(mockSchemaRegistryClient.getAllVersions(_ : String))
.expects(*)
.throws(new RestClientException("error", 0, 50005))
.repeat(2)

(mockSchemaRegistryClient.getAllVersions _)
(mockSchemaRegistryClient.getAllVersions(_ : String))
.expects(*)
.returns(expectedVersions.map(Integer.valueOf).asJava)

Expand Down Expand Up @@ -426,7 +426,7 @@ class SchemaRegistrySpec extends AnyFlatSpecLike with MockFactory with Matchers

it must "fail if all attempts were used" in {
val mockSchemaRegistryClient = mock[SchemaRegistryClient]
(mockSchemaRegistryClient.getAllVersions _)
(mockSchemaRegistryClient.getAllVersions(_ : String))
.expects(*)
.throws(new RestClientException("error", 0, 50005))
.repeat(3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package hydra.avro.resource

import hydra.avro.registry.SchemaRegistryException
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
import org.apache.avro.{Schema, SchemaBuilder}
import org.scalatest.BeforeAndAfterEach
Expand Down Expand Up @@ -208,7 +209,7 @@ class SchemaResourceLoaderSpec
"returns the same underlying key schema instance if the registry metadata hasn't changed"
) {
val client = new MockSchemaRegistryClient
client.register(testKeySchema.getFullName + "-key", testKeySchema)
client.register(testKeySchema.getFullName + "-key", new AvroSchema(testKeySchema))
val loader = new SchemaResourceLoader(
"http://localhost:48223",
client,
Expand All @@ -231,7 +232,7 @@ class SchemaResourceLoaderSpec
) {
val nschema = newValueSchema(testValueSchema.getNamespace, "ntest")
val client = new MockSchemaRegistryClient
client.register(nschema.getFullName + "-value", nschema)
client.register(nschema.getFullName + "-value", new AvroSchema(nschema))
val loader = new SchemaResourceLoader(
"http://localhost:48223",
client,
Expand All @@ -258,7 +259,7 @@ class SchemaResourceLoaderSpec
false,
fields.asJava
)
client.register(nschema.getFullName + "-value", evolvedSchema)
client.register(nschema.getFullName + "-value", new AvroSchema(evolvedSchema))
eventually {
whenReady(loader.retrieveValueSchema(nschema.getFullName)) {
schemaResource => (schemaResource.schema eq nschema) shouldBe false
Expand All @@ -271,7 +272,7 @@ class SchemaResourceLoaderSpec
) {
val nschema = newKeySchema(testKeySchema.getNamespace, "ntest")
val client = new MockSchemaRegistryClient
client.register(nschema.getFullName + "-value", nschema)
client.register(nschema.getFullName + "-value", new AvroSchema(nschema))
val loader = new SchemaResourceLoader(
"http://localhost:48223",
client,
Expand Down Expand Up @@ -300,7 +301,7 @@ class SchemaResourceLoaderSpec
false,
fields.asJava
)
client.register(nschema.getFullName + "-value", evolvedSchema)
client.register(nschema.getFullName + "-value", new AvroSchema(evolvedSchema))
eventually {
whenReady(loader.retrieveValueSchema(nschema.getFullName)) {
schemaResource => (schemaResource.schema eq nschema) shouldBe false
Expand Down
27 changes: 14 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ lazy val defaultSettings = Seq(
excludeDependencies += "org.slf4j" % "slf4j-log4j12",
excludeDependencies += "log4j" % "log4j",
dependencyOverrides ++= Seq(
"org.apache.commons" % "commons-lang3" % "3.12.0",
"org.apache.commons" % "commons-compress" % "1.22",
"org.apache.commons" % "lang3" % "3.1.0",
"io.confluent" % "kafka-schema-registry" % "5.4.2" % "test",
"io.confluent" % "kafka-avro-serializer" % "5.4.2" % "test"
"org.apache.commons" % "commons-compress" % "1.24.0",
"io.netty" % "netty-codec" % "4.1.77.Final",
// "org.apache.zookeeper" % "zookeeper" % "3.7.2", -- snyk vulnerability fix
"org.xerial.snappy" % "snappy-java" % "1.1.10.4",
"org.apache.avro" % "avro" % Dependencies.avroVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.3",
"org.apache.kafka" %% "kafka" % "2.8.2" % "test",
"io.confluent" %% "kafka-schema-registry" % "6.2.1" % "test",
"io.confluent" %% "kafka-avro-serializer" % "6.2.1" % "test",
),
addCompilerPlugin(
"org.typelevel" %% "kind-projector" % "0.11.3" cross CrossVersion.full
Expand Down Expand Up @@ -109,10 +113,10 @@ lazy val core = Project(
.settings(
moduleSettings,
name := "hydra-core",
libraryDependencies ++= Dependencies.coreDeps ++ Dependencies.awsAuthDeps,
libraryDependencies ++= Dependencies.coreDeps ++ Dependencies.awsAuthDeps ++ Dependencies.kafkaSchemaRegistryDep,
dependencyOverrides ++= Seq(
"io.confluent" % "kafka-schema-registry" % "5.4.2",
"io.confluent" % "kafka-avro-serializer" % "5.4.2"
"org.apache.kafka" %% "kafka" % "2.8.2",
"org.apache.kafka" % "kafka-clients" % "2.8.2"
)
)

Expand All @@ -126,8 +130,8 @@ lazy val kafka = Project(
name := "hydra-kafka",
libraryDependencies ++= Dependencies.kafkaDeps,
dependencyOverrides ++= Seq(
"io.confluent" % "kafka-schema-registry" % "5.4.2",
"io.confluent" % "kafka-avro-serializer" % "5.4.2"
"org.apache.kafka" %% "kafka" % "2.8.2",
"org.apache.kafka" % "kafka-clients" % "2.8.2"
)
)

Expand All @@ -141,9 +145,6 @@ lazy val avro = Project(
libraryDependencies ++= Dependencies.avroDeps
)

val sbSettings =
defaultSettings ++ Test.testSettings ++ noPublishSettings ++ restartSettings

lazy val ingest = Project(
id = "ingest",
base = file("ingest")
Expand Down
9 changes: 9 additions & 0 deletions common/src/main/scala/hydra/common/util/ClassUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package hydra.common.util

object ClassUtils {

// Null safe access to class.getSimpleName
def getSimpleName(cls: Class[_]): String = {
if (cls == null) "" else cls.getSimpleName
}
}
6 changes: 3 additions & 3 deletions core/src/main/scala/hydra/core/akka/SchemaRegistryActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientExcept
import org.apache.avro.{Schema, SchemaParseException}
import org.apache.kafka.common.PartitionInfo
import scalacache.cachingF

import io.confluent.kafka.schemaregistry.avro.AvroSchema
import scala.collection.JavaConverters._
import scala.collection.immutable.Map
import scala.concurrent.Future
Expand Down Expand Up @@ -173,8 +173,8 @@ class SchemaRegistryActor(

val subject = getSubject(schema)
log.debug(s"Registering schema ${schema.getFullName}: $json")
val schemaId = registry.registryClient.register(subject, schema)
val version = registry.registryClient.getVersion(subject, schema)
val schemaId = registry.registryClient.register(subject, new AvroSchema(schema))
val version = registry.registryClient.getVersion(subject, new AvroSchema(schema))
RegisterSchemaResponse(SchemaResource(schemaId, version, schema))
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/hydra/core/ingest/Ingestor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import hydra.core.akka.InitializingActor
import hydra.core.monitor.HydraMetrics
import hydra.core.protocol._
import hydra.core.transport.{AckStrategy, HydraRecord, RecordFactory}
import org.apache.commons.lang3.ClassUtils
import hydra.common.util.ClassUtils

import scala.concurrent.Future
import scala.util.{Success, Try}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import hydra.avro.resource.SchemaResource
import hydra.common.config.KafkaConfigUtils.SchemaRegistrySecurityConfig
import hydra.core.akka.SchemaRegistryActor._
import hydra.core.protocol.HydraApplicationError
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import org.apache.avro.Schema.Parser
import org.apache.avro.SchemaBuilder
import org.scalatest.BeforeAndAfterAll
Expand Down Expand Up @@ -58,9 +59,9 @@ class SchemaRegistryActorSpec
.endRecord

override def beforeAll() = {
client.register("hydra.test.Tester-value", testSchema)
client.register("hydra.test.TesterWithKey-key", testKeySchema)
client.register("hydra.test.TesterWithKey-value", testSchema)
client.register("hydra.test.Tester-value", new AvroSchema(testSchema))
client.register("hydra.test.TesterWithKey-key", new AvroSchema(testKeySchema))
client.register("hydra.test.TesterWithKey-value", new AvroSchema(testSchema))
}

val listener = TestProbe()
Expand Down
20 changes: 10 additions & 10 deletions helm/eks-dev-values.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ replicas: 2
#velero.io/exclude-from-backup" = true
service:
type: ClusterIP
port: 8080
port: 8088
annotations: {}
labels: {}

Expand All @@ -37,25 +37,25 @@ ingress:

resources:
limits:
memory: 6Gi
memory: 7Gi
requests:
cpu: 250m
memory: 1Gi
memory: 2Gi

tolerations: []

nodeSelector: {}

config:
LOG_DIR: /var/log/hydra/
LOG_LEVEL: INFO
HYDRA_KAFKA_PRODUCER_BOOTSTRAP_SERVERS: b-1.isdvsdevblueuswest2.8xkirx.c14.kafka.us-west-2.amazonaws.com:9098,b-3.isdvsdevblueuswest2.8xkirx.c14.kafka.us-west-2.amazonaws.com:9098,b-2.isdvsdevblueuswest2.8xkirx.c14.kafka.us-west-2.amazonaws.com:9098
HYDRA_KAFKA_PRODUCER_BOOTSTRAP_SERVERS: b-3.isdvsdevblueuswest2.8xkirx.c14.kafka.us-west-2.amazonaws.com:9092,b-2.isdvsdevblueuswest2.8xkirx.c14.kafka.us-west-2.amazonaws.com:9092,b-1.isdvsdevblueuswest2.8xkirx.c14.kafka.us-west-2.amazonaws.com:9092
HYDRA_KAFKA_SECURITY_PROTOCOL: 'PLAINTEXT'
HYDRA_MIN_INSYNC_REPLICAS: 1
HYDRA_REPLICATION_FACTOR: 1
HYDRA_SCHEMA_REGISTRY_URL: https://dvs-dev-schema-registry.eplur-staging.vnerd.com:8081
HYDRA_V2_METADATA_CONTACT: '#dev-data-platform'
HYDRA_REPLICATION_FACTOR: 1
HYDRA_V2_METADATA_CONSUMER_GROUP: 'v2MetadataConsumer'
HYDRA_V2_METADATA_CONSUMER_GROUP: 'dev-cluster-v2MetadataConsumer'
HYDRA_V2_CREATE_TOPICS_ENABLED: 'true'
HYDRA_V2_METADATA_CREATE_ON_STARTUP: 'true'
HYDRA_MIN_INSYNC_REPLICAS: 1

LOG_DIR: /var/log/hydra/
LOG_LEVEL: INFO

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.kafka.common.{Node, PartitionInfo}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import spray.json.{JsArray, JsObject, JsValue, RootJsonFormat}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}

import scala.collection.immutable.Map
import scala.concurrent.duration._
Expand Down
Loading

0 comments on commit 1a2c134

Please sign in to comment.