Skip to content

Commit

Permalink
ADAPT-1701: update schema-registry client and kafka version (#13)
Browse files Browse the repository at this point in the history
* update schema-registry client

* fix apache commons lang3 incompatibiity issue

* temp

* fix RedisSchemaRegistryClientSpec test

* update kafka version and stabilize different kafka versions to 2.8.2

* 1. upgrade relevant libraries with changes
2. fix classutils bug

* clean-up

* fix synk vulnerabilities

* undo zookeeper snyk fix

* set avro version to 1.10.0

* clean up and adjust scala-style

* more changes
  • Loading branch information
shreedhar-kc authored Feb 29, 2024
1 parent 60e0436 commit 8d47998
Show file tree
Hide file tree
Showing 31 changed files with 416 additions and 331 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,8 @@ build.properties
#ensime http://ensime.github.io/
.ensime
.ensime_cache

.bsp

# ignore temp files and folders
__temp*
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object ConfluentSchemaRegistry extends LoggingAdapter {

private val cachedClients = CacheBuilder
.newBuilder()
.build(
.build[SchemaRegistryClientInfo, ConfluentSchemaRegistry](
new CacheLoader[SchemaRegistryClientInfo, ConfluentSchemaRegistry] {

def load(info: SchemaRegistryClientInfo): ConfluentSchemaRegistry = {
Expand Down
314 changes: 167 additions & 147 deletions avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala

Large diffs are not rendered by default.

204 changes: 113 additions & 91 deletions avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package hydra.avro.registry

import com.typesafe.config.ConfigFactory
import hydra.common.config.KafkaConfigUtils.SchemaRegistrySecurityConfig
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, MockSchemaRegistryClient}
import org.apache.avro.Schema
import org.scalatest.concurrent.ScalaFutures
Expand Down Expand Up @@ -52,7 +53,7 @@ class ConfluentSchemaRegistrySpec

override def beforeAll(): Unit = {
id =
ConfluentSchemaRegistry.mockRegistry.register(schema.getFullName, schema)
ConfluentSchemaRegistry.mockRegistry.register(schema.getFullName, new AvroSchema(schema))
}

describe("When creating a schema registry client") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import org.apache.avro.{Schema, SchemaBuilder}
import org.scalatest.BeforeAndAfterAll
import com.github.sebruck.EmbeddedRedis
import hydra.avro.registry.RedisSchemaRegistryClient.IntSchemaMetadataMapBinCodec
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaMetadata, SchemaRegistryClient}
import net.manub.embeddedkafka.schemaregistry.{EmbeddedKafka, EmbeddedKafkaConfig}
import io.github.embeddedkafka.schemaregistry.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import redis.embedded.RedisServer
Expand Down Expand Up @@ -60,15 +61,16 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with
val schema1: Schema = SchemaBuilder.record("Test1").fields()
.requiredString("testing1").endRecord()

val schema12: Schema = SchemaBuilder.record("Test12").fields()
.requiredString("testing1").endRecord()
val schema12: Schema = SchemaBuilder.record("Test1").fields()
.requiredString("testing1").optionalString("testing22").endRecord()

val schema2: Schema = SchemaBuilder.record("Test2").fields()
.requiredString("testing2").endRecord()

//register two schemas with different clients
val redisClientResult = redisClient.register(topicName1, schema1)
val cachedClientResult = cachedClient.register(topicName2 ,schema2)
val redisClientResult = redisClient.register(topicName1, new AvroSchema(schema1))
val cachedClientResult = cachedClient.register(topicName2 , new AvroSchema(schema2))


//test the getAllSubjectsById method
assert(redisClient.getAllSubjectsById(redisClientResult).contains(topicName1))
Expand All @@ -77,17 +79,17 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with
assert(cachedClient.getAllSubjectsById(cachedClientResult).contains(topicName2))

//test the getById method
redisClient.getById(redisClientResult) shouldBe schema1
cachedClient.getById(redisClientResult) shouldBe schema1
redisClient.getById(cachedClientResult) shouldBe schema2
cachedClient.getById(cachedClientResult) shouldBe schema2
redisClient.getSchemaById(redisClientResult) shouldBe new AvroSchema(schema1)
cachedClient.getSchemaById(redisClientResult) shouldBe new AvroSchema(schema1)
redisClient.getSchemaById(cachedClientResult) shouldBe new AvroSchema(schema2)
cachedClient.getSchemaById(cachedClientResult) shouldBe new AvroSchema(schema2)

//test the getBySubjectAndId method
Thread.sleep(3000)
assert(redisClient.getBySubjectAndId(topicName1, redisClientResult).equals(schema1))
assert(cachedClient.getBySubjectAndId(topicName1, redisClientResult).equals(schema1))
assert(redisClient.getBySubjectAndId(topicName2, cachedClientResult).equals(schema2))
assert(cachedClient.getBySubjectAndId(topicName2, cachedClientResult).equals(schema2))
assert(redisClient.getSchemaBySubjectAndId(topicName1, redisClientResult).equals(new AvroSchema(schema1)))
assert(cachedClient.getSchemaBySubjectAndId(topicName1, redisClientResult).equals(new AvroSchema(schema1)))
assert(redisClient.getSchemaBySubjectAndId(topicName2, cachedClientResult).equals(new AvroSchema(schema2)))
assert(cachedClient.getSchemaBySubjectAndId(topicName2, cachedClientResult).equals(new AvroSchema(schema2)))


//test the getAllSubjects method
Expand All @@ -109,10 +111,10 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with

//test the getId method
Thread.sleep(3000)
redisClient.getId(topicName1, schema1) shouldBe redisClientResult
cachedClient.getId(topicName1, schema1) shouldBe redisClientResult
redisClient.getId(topicName2, schema2) shouldBe cachedClientResult
cachedClient.getId(topicName2, schema2) shouldBe cachedClientResult
redisClient.getId(topicName1, new AvroSchema(schema1)) shouldBe redisClientResult
cachedClient.getId(topicName1, new AvroSchema(schema1)) shouldBe redisClientResult
redisClient.getId(topicName2, new AvroSchema(schema2)) shouldBe cachedClientResult
cachedClient.getId(topicName2, new AvroSchema(schema2)) shouldBe cachedClientResult

//test the getLatestSchemaMetadata method
val schemaMetadata1 = new SchemaMetadata(1, 1,schema1.toString)
Expand All @@ -123,13 +125,13 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with
schemaMetadata1Result.getVersion shouldBe schemaMetadata1.getVersion
schemaMetadata1Result.getSchema shouldBe schemaMetadata1.getSchema

redisClient.register(topicName1, schema12)
redisClient.register(topicName1, new AvroSchema(schema12))
val schemaMetadata2Result = redisClient.getLatestSchemaMetadata(topicName1)
schemaMetadata2Result.getId shouldBe schemaMetadata2.getId
schemaMetadata2Result.getVersion shouldBe schemaMetadata2.getVersion
schemaMetadata2Result.getSchema shouldBe schemaMetadata2.getSchema

//test the getSchemaMetadata method
//test the getSchemaMetadata method
val metadata1 = redisClient.getSchemaMetadata(topicName1, 1)
metadata1.getId shouldBe schemaMetadata1.getId
metadata1.getVersion shouldBe schemaMetadata1.getVersion
Expand All @@ -143,9 +145,9 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with

//test the getVersion method
Thread.sleep(3000)
redisClient.getVersion(topicName1, schema1) shouldBe 1
redisClient.getVersion(topicName1, schema12) shouldBe 2
redisClient.getVersion(topicName2, schema2) shouldBe 1
redisClient.getVersion(topicName1, new AvroSchema(schema1)) shouldBe 1
redisClient.getVersion(topicName1, new AvroSchema(schema12)) shouldBe 2
redisClient.getVersion(topicName2, new AvroSchema(schema2)) shouldBe 1

//test the deleteSchemaVersion method
//the latest metadata is in this test -> test the getLatestSchemaMetadata method
Expand All @@ -160,7 +162,7 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with

intercept[RestClientException] {
redisClient.getLatestSchemaMetadata(topicName2)
}.getMessage shouldBe "Subject not found.; error code: 40401"
}.getMessage shouldBe "Subject 'topicB' not found.; error code: 40401"

succeed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,12 @@ class SchemaRegistrySpec extends AnyFlatSpecLike with MockFactory with Matchers
it must "do retries for getAllVersions when SchemaRegistry throws error" in {
val expectedVersions = List(1, 2, 3)
val mockSchemaRegistryClient = mock[SchemaRegistryClient]
(mockSchemaRegistryClient.getAllVersions _)
(mockSchemaRegistryClient.getAllVersions(_ : String))
.expects(*)
.throws(new RestClientException("error", 0, 50005))
.repeat(2)

(mockSchemaRegistryClient.getAllVersions _)
(mockSchemaRegistryClient.getAllVersions(_ : String))
.expects(*)
.returns(expectedVersions.map(Integer.valueOf).asJava)

Expand Down Expand Up @@ -426,7 +426,7 @@ class SchemaRegistrySpec extends AnyFlatSpecLike with MockFactory with Matchers

it must "fail if all attempts were used" in {
val mockSchemaRegistryClient = mock[SchemaRegistryClient]
(mockSchemaRegistryClient.getAllVersions _)
(mockSchemaRegistryClient.getAllVersions(_ : String))
.expects(*)
.throws(new RestClientException("error", 0, 50005))
.repeat(3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package hydra.avro.resource

import hydra.avro.registry.SchemaRegistryException
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
import org.apache.avro.{Schema, SchemaBuilder}
import org.scalatest.BeforeAndAfterEach
Expand Down Expand Up @@ -208,7 +209,7 @@ class SchemaResourceLoaderSpec
"returns the same underlying key schema instance if the registry metadata hasn't changed"
) {
val client = new MockSchemaRegistryClient
client.register(testKeySchema.getFullName + "-key", testKeySchema)
client.register(testKeySchema.getFullName + "-key", new AvroSchema(testKeySchema))
val loader = new SchemaResourceLoader(
"http://localhost:48223",
client,
Expand All @@ -231,7 +232,7 @@ class SchemaResourceLoaderSpec
) {
val nschema = newValueSchema(testValueSchema.getNamespace, "ntest")
val client = new MockSchemaRegistryClient
client.register(nschema.getFullName + "-value", nschema)
client.register(nschema.getFullName + "-value", new AvroSchema(nschema))
val loader = new SchemaResourceLoader(
"http://localhost:48223",
client,
Expand All @@ -258,7 +259,7 @@ class SchemaResourceLoaderSpec
false,
fields.asJava
)
client.register(nschema.getFullName + "-value", evolvedSchema)
client.register(nschema.getFullName + "-value", new AvroSchema(evolvedSchema))
eventually {
whenReady(loader.retrieveValueSchema(nschema.getFullName)) {
schemaResource => (schemaResource.schema eq nschema) shouldBe false
Expand All @@ -271,7 +272,7 @@ class SchemaResourceLoaderSpec
) {
val nschema = newKeySchema(testKeySchema.getNamespace, "ntest")
val client = new MockSchemaRegistryClient
client.register(nschema.getFullName + "-value", nschema)
client.register(nschema.getFullName + "-value", new AvroSchema(nschema))
val loader = new SchemaResourceLoader(
"http://localhost:48223",
client,
Expand Down Expand Up @@ -300,7 +301,7 @@ class SchemaResourceLoaderSpec
false,
fields.asJava
)
client.register(nschema.getFullName + "-value", evolvedSchema)
client.register(nschema.getFullName + "-value", new AvroSchema(evolvedSchema))
eventually {
whenReady(loader.retrieveValueSchema(nschema.getFullName)) {
schemaResource => (schemaResource.schema eq nschema) shouldBe false
Expand Down
27 changes: 14 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ lazy val defaultSettings = Seq(
excludeDependencies += "org.slf4j" % "slf4j-log4j12",
excludeDependencies += "log4j" % "log4j",
dependencyOverrides ++= Seq(
"org.apache.commons" % "commons-lang3" % "3.12.0",
"org.apache.commons" % "commons-compress" % "1.22",
"org.apache.commons" % "lang3" % "3.1.0",
"io.confluent" % "kafka-schema-registry" % "5.4.2" % "test",
"io.confluent" % "kafka-avro-serializer" % "5.4.2" % "test"
"org.apache.commons" % "commons-compress" % "1.24.0",
"io.netty" % "netty-codec" % "4.1.77.Final",
// "org.apache.zookeeper" % "zookeeper" % "3.7.2", -- snyk vulnerability fix
"org.xerial.snappy" % "snappy-java" % "1.1.10.4",
"org.apache.avro" % "avro" % Dependencies.avroVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.3",
"org.apache.kafka" %% "kafka" % "2.8.2" % "test",
"io.confluent" %% "kafka-schema-registry" % "6.2.1" % "test",
"io.confluent" %% "kafka-avro-serializer" % "6.2.1" % "test",
),
addCompilerPlugin(
"org.typelevel" %% "kind-projector" % "0.11.3" cross CrossVersion.full
Expand Down Expand Up @@ -109,10 +113,10 @@ lazy val core = Project(
.settings(
moduleSettings,
name := "hydra-core",
libraryDependencies ++= Dependencies.coreDeps ++ Dependencies.awsAuthDeps,
libraryDependencies ++= Dependencies.coreDeps ++ Dependencies.awsAuthDeps ++ Dependencies.kafkaSchemaRegistryDep,
dependencyOverrides ++= Seq(
"io.confluent" % "kafka-schema-registry" % "5.4.2",
"io.confluent" % "kafka-avro-serializer" % "5.4.2"
"org.apache.kafka" %% "kafka" % "2.8.2",
"org.apache.kafka" % "kafka-clients" % "2.8.2"
)
)

Expand All @@ -126,8 +130,8 @@ lazy val kafka = Project(
name := "hydra-kafka",
libraryDependencies ++= Dependencies.kafkaDeps,
dependencyOverrides ++= Seq(
"io.confluent" % "kafka-schema-registry" % "5.4.2",
"io.confluent" % "kafka-avro-serializer" % "5.4.2"
"org.apache.kafka" %% "kafka" % "2.8.2",
"org.apache.kafka" % "kafka-clients" % "2.8.2"
)
)

Expand All @@ -141,9 +145,6 @@ lazy val avro = Project(
libraryDependencies ++= Dependencies.avroDeps
)

val sbSettings =
defaultSettings ++ Test.testSettings ++ noPublishSettings ++ restartSettings

lazy val ingest = Project(
id = "ingest",
base = file("ingest")
Expand Down
9 changes: 9 additions & 0 deletions common/src/main/scala/hydra/common/util/ClassUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package hydra.common.util

object ClassUtils {

// Null safe access to class.getSimpleName
def getSimpleName(cls: Class[_]): String = {
if (cls == null) "" else cls.getSimpleName
}
}
6 changes: 3 additions & 3 deletions core/src/main/scala/hydra/core/akka/SchemaRegistryActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientExcept
import org.apache.avro.{Schema, SchemaParseException}
import org.apache.kafka.common.PartitionInfo
import scalacache.cachingF

import io.confluent.kafka.schemaregistry.avro.AvroSchema
import scala.collection.JavaConverters._
import scala.collection.immutable.Map
import scala.concurrent.Future
Expand Down Expand Up @@ -173,8 +173,8 @@ class SchemaRegistryActor(

val subject = getSubject(schema)
log.debug(s"Registering schema ${schema.getFullName}: $json")
val schemaId = registry.registryClient.register(subject, schema)
val version = registry.registryClient.getVersion(subject, schema)
val schemaId = registry.registryClient.register(subject, new AvroSchema(schema))
val version = registry.registryClient.getVersion(subject, new AvroSchema(schema))
RegisterSchemaResponse(SchemaResource(schemaId, version, schema))
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/hydra/core/ingest/Ingestor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import hydra.core.akka.InitializingActor
import hydra.core.monitor.HydraMetrics
import hydra.core.protocol._
import hydra.core.transport.{AckStrategy, HydraRecord, RecordFactory}
import org.apache.commons.lang3.ClassUtils
import hydra.common.util.ClassUtils

import scala.concurrent.Future
import scala.util.{Success, Try}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import hydra.avro.resource.SchemaResource
import hydra.common.config.KafkaConfigUtils.SchemaRegistrySecurityConfig
import hydra.core.akka.SchemaRegistryActor._
import hydra.core.protocol.HydraApplicationError
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import org.apache.avro.Schema.Parser
import org.apache.avro.SchemaBuilder
import org.scalatest.BeforeAndAfterAll
Expand Down Expand Up @@ -58,9 +59,9 @@ class SchemaRegistryActorSpec
.endRecord

override def beforeAll() = {
client.register("hydra.test.Tester-value", testSchema)
client.register("hydra.test.TesterWithKey-key", testKeySchema)
client.register("hydra.test.TesterWithKey-value", testSchema)
client.register("hydra.test.Tester-value", new AvroSchema(testSchema))
client.register("hydra.test.TesterWithKey-key", new AvroSchema(testKeySchema))
client.register("hydra.test.TesterWithKey-value", new AvroSchema(testSchema))
}

val listener = TestProbe()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.kafka.common.{Node, PartitionInfo}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import spray.json.{JsArray, JsObject, JsValue, RootJsonFormat}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}

import scala.collection.immutable.Map
import scala.concurrent.duration._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.typelevel.log4cats.Logger
import hydra.common.config.KafkaConfigUtils._
import kafka.server.KafkaConfig

/**
* Internal interface to interact with the KafkaAdminClient from FS2 Kafka.
* Provides a live version for production usage and a test version for integration testing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.pluralsight.hydra.avro.JsonConverter
import hydra.core.transport.AckStrategy
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.lang3.StringUtils


/**
* Created by alexsilva on 10/30/15.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package hydra.kafka.producer

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import hydra.core.transport.AckStrategy
import org.apache.commons.lang3.StringUtils

/**
* Created by alexsilva on 11/30/15.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package hydra.kafka.producer

import hydra.core.transport.HydraRecord
import org.apache.commons.lang3.ClassUtils
import hydra.common.util.ClassUtils
import org.apache.kafka.clients.producer.ProducerRecord

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package hydra.kafka.producer

import hydra.core.transport.AckStrategy
import org.apache.commons.lang3.StringUtils


/**
* Created by alexsilva on 11/30/15.
Expand Down
Loading

0 comments on commit 8d47998

Please sign in to comment.