diff --git a/.gitignore b/.gitignore index 6ab661f5b..592cde8e6 100644 --- a/.gitignore +++ b/.gitignore @@ -70,3 +70,8 @@ build.properties #ensime http://ensime.github.io/ .ensime .ensime_cache + +.bsp + +# ignore temp files and folders +__temp* \ No newline at end of file diff --git a/avro/src/main/scala/hydra/avro/registry/ConfluentSchemaRegistry.scala b/avro/src/main/scala/hydra/avro/registry/ConfluentSchemaRegistry.scala index 56a121570..5ee245ad0 100644 --- a/avro/src/main/scala/hydra/avro/registry/ConfluentSchemaRegistry.scala +++ b/avro/src/main/scala/hydra/avro/registry/ConfluentSchemaRegistry.scala @@ -52,7 +52,7 @@ object ConfluentSchemaRegistry extends LoggingAdapter { private val cachedClients = CacheBuilder .newBuilder() - .build( + .build[SchemaRegistryClientInfo, ConfluentSchemaRegistry]( new CacheLoader[SchemaRegistryClientInfo, ConfluentSchemaRegistry] { def load(info: SchemaRegistryClientInfo): ConfluentSchemaRegistry = { diff --git a/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala b/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala index 115db9a4f..f2c2fd26a 100644 --- a/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala +++ b/avro/src/main/scala/hydra/avro/registry/RedisSchemaRegistryClient.scala @@ -2,7 +2,7 @@ package hydra.avro.registry import com.typesafe.config.{Config, ConfigFactory} import hydra.common.config.ConfigSupport.ConfigImplicits -import hydra.common.config.KafkaConfigUtils.SchemaRegistrySecurityConfig +import hydra.common.config.KafkaConfigUtils.{SchemaRegistrySecurityConfig, schemaRegistrySecurityConfig} import io.confluent.kafka.schemaregistry.client.{SchemaMetadata, SchemaRegistryClient} import io.confluent.kafka.schemaregistry.client.rest.RestService import io.confluent.kafka.schemaregistry.client.security.SslFactory @@ -13,8 +13,14 @@ import scalacache.modes.try_._ import scalacache.serialization.Codec import scalacache.serialization.Codec.DecodingResult import _root_.redis.clients.jedis._ +import io.confluent.kafka.schemaregistry.ParsedSchema +import io.confluent.kafka.schemaregistry.avro.AvroSchema +import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference +import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} +import java.util +import java.util.Optional import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration @@ -71,6 +77,7 @@ object RedisSchemaRegistryClient { new SerializableSchemaMetadata(sm.getId, sm.getVersion, sm.getSchema) } } + private class SerializableSchemaMetadata(id: Int, version: Int, schema: String) extends Serializable { def getSchemaMetadata: SchemaMetadata = { new SchemaMetadata(id, version, schema) @@ -168,6 +175,7 @@ object RedisSchemaRegistryClient { } } +// scalastyle:off number.of.methods class RedisSchemaRegistryClient(restService: RestService, redisHost: String, redisPort: Int, @@ -181,7 +189,7 @@ class RedisSchemaRegistryClient(restService: RestService, } def this(baseUrl: String, redisHost: String, redisPort: Int, ssl: Boolean) { - this(new RestService(baseUrl), redisHost, redisPort, Map.empty[String, String], Map.empty[String, Any], CacheConfigs(1, 1, 1), ssl) + this(new RestService(baseUrl), redisHost, redisPort, Map.empty[String, String], Map.empty[String, Any], CacheConfigs(1, 1, 1), ssl) } def this(baseUrl: String, redisHost: String, redisPort: Int, cacheConfigs: CacheConfigs, ssl: Boolean) { @@ -208,7 +216,8 @@ class RedisSchemaRegistryClient(restService: RestService, this(new RestService(baseUrl), redisHost, redisPort, httpHeaders, config, CacheConfigs(1, 1, 1), ssl) } - def this(baseUrl: String, redisHost: String, redisPort: Int, httpHeaders: Map[String, String], config: Map[String, Any], cacheConfigs: CacheConfigs, ssl: Boolean) { + def this(baseUrl: String, redisHost: String, redisPort: Int, httpHeaders: Map[String, String], config: Map[String, Any], + cacheConfigs: CacheConfigs, ssl: Boolean) { this(new RestService(baseUrl), redisHost, redisPort, httpHeaders, config, cacheConfigs, ssl) } @@ -240,7 +249,8 @@ class RedisSchemaRegistryClient(restService: RestService, this(new RestService(baseUrls.asJava), redisHost, redisPort, httpHeaders, config, CacheConfigs(1, 1, 1), ssl) } - def this(baseUrls: List[String], redisHost: String, redisPort: Int, httpHeaders: Map[String, String], config: Map[String, Any], cacheConfigs: CacheConfigs, ssl: Boolean) { + def this(baseUrls: List[String], redisHost: String, redisPort: Int, httpHeaders: Map[String, String], + config: Map[String, Any], cacheConfigs: CacheConfigs, ssl: Boolean) { this(new RestService(baseUrls.asJava), redisHost, redisPort, httpHeaders, config, cacheConfigs, ssl) } @@ -293,7 +303,7 @@ class RedisSchemaRegistryClient(restService: RestService, restService.setHttpHeaders(httpHeaders.asJava) } - if (configs!= null && configs.nonEmpty) { + if (configs != null && configs.nonEmpty) { restService.configure(configs.asJava) val srPrefix = "schema.registry." @@ -312,34 +322,6 @@ class RedisSchemaRegistryClient(restService: RestService, } } - override def register(s: String, schema: Schema): Int = { - register(s, schema, 0, -1) - } - - override def getById(i: Int): Schema = synchronized { - val restSchema = restService.getId(i) - val parser = new Schema.Parser() - parser.setValidateDefaults(false) - parser.parse(restSchema.getSchemaString) - } - - override def getBySubjectAndId(s: String, i: Int): Schema = { - if (DO_NOT_CACHE_LIST.exists(s.startsWith)) { - restGetSchemaById(s, i) - } else { - val idKey = buildIdKey(s) - - idCache.get(idKey) match { - case Failure(_) => restGetId(s, i)(i) - case Success(map) => - map match { - case Some(m) if m.keys.exists(_ == i) => m(i) - case _ => getBySubjectAndIdSynchronized(s, i) - } - } - } - } - private def restGetSchemaById(s: String, i: Int): Schema = { val restSchema = restService.getId(i) val parser = new Schema.Parser() @@ -379,22 +361,15 @@ class RedisSchemaRegistryClient(restService: RestService, restService.getAllSubjectsById(i) } - def getLatestSchemaMetadata(s: String): SchemaMetadata = synchronized { + def getLatestSchemaMetadata(s: String): SchemaMetadata = synchronized { val response = restService.getLatestVersion(s) val id = response.getId - val schema: String = response.getSchema + val schema: String = response.getSchema val version = response.getVersion new SchemaMetadata(id, version, schema) } -/* override def getSchemaMetadata(s: String, i: Int): SchemaMetadata = { - val response = restService.getVersion(s, i) - val id = response.getId - val schema: String = response.getSchema - new SchemaMetadata(id, i, schema) - }*/ - - override def getSchemaMetadata(s: String, i: Int): SchemaMetadata = { + override def getSchemaMetadata(subject: String, id: Int): SchemaMetadata = { def get(s: String, i: Int): SchemaMetadata = { val response = restService.getVersion(s, i) val id = response.getId @@ -402,63 +377,33 @@ class RedisSchemaRegistryClient(restService: RestService, new SchemaMetadata(id, i, schema) } - if(DO_NOT_CACHE_LIST.exists(s.startsWith)) { - get(s, i) + if (DO_NOT_CACHE_LIST.exists(subject.startsWith)) { + get(subject, id) } else { - val metadataKey = buildMetadataKey(s) + val metadataKey = buildMetadataKey(subject) metadataCache.get(metadataKey) match { case Failure(_) => - val sm = get(s, i) - metadataCache.put(metadataKey)(Map(i -> sm), metadataCacheDurationTtl) + val sm = get(subject, id) + metadataCache.put(metadataKey)(Map(id -> sm), metadataCacheDurationTtl) sm case Success(map) => map match { - case Some(m) if m.keySet.contains(i) => - m(i) + case Some(m) if m.keySet.contains(id) => + m(id) case Some(m) => - val sm = get(s, i) - val concatMap: Map[Int, SchemaMetadata] = m ++ Map(i -> sm) + val sm = get(subject, id) + val concatMap: Map[Int, SchemaMetadata] = m ++ Map(id -> sm) metadataCache.put(metadataKey)(concatMap, metadataCacheDurationTtl) sm case None => - val sm = get(s, i) - metadataCache.put(metadataKey)(Map(i -> sm), metadataCacheDurationTtl) + val sm = get(subject, id) + metadataCache.put(metadataKey)(Map(id -> sm), metadataCacheDurationTtl) sm } } } } - override def getVersion(s: String, schema: Schema): Int = synchronized { - if(DO_NOT_CACHE_LIST.exists(s.startsWith)) { - val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true) - response.getVersion.toInt - } else { - val versionKey = buildVersionKey(s) - - versionCache.get(versionKey) match { - case Failure(_) => - val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true) - versionCache.put(versionKey)(Map(schema -> response.getVersion.toInt), versionCacheDurationTtl) - response.getVersion.toInt - case Success(map) => - map match { - case Some(m) if m.keySet.contains(schema) => - m(schema) - case Some(m) => - val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true) - val concatMap: Map[Schema, Int] = m ++ Map(schema -> response.getVersion.toInt) - versionCache.put(versionKey)(concatMap, versionCacheDurationTtl) - response.getVersion.toInt - case None => - val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), s, true) - versionCache.put(versionKey)(Map(schema -> response.getVersion.toInt), versionCacheDurationTtl) - response.getVersion.toInt - } - } - } - } - override def getAllVersions(s: String): java.util.List[Integer] = { restService.getAllVersions(s) } @@ -491,52 +436,6 @@ class RedisSchemaRegistryClient(restService: RestService, restService.getAllSubjects() } - override def getId(s: String, schema: Schema): Int = synchronized { - if (DO_NOT_CACHE_LIST.exists(s.startsWith)) { - restService.lookUpSubjectVersion(schema.toString, s, false).getId.toInt - } else { - def call(): Map[Schema, Int] = { - val response = restService.lookUpSubjectVersion(schema.toString, s, false) - Map(schema -> response.getId.toInt) - } - - val idKey = buildIdKey(s) - - def populateVersionCache(m: Map[Schema, Int]): Try[Any] = { - val idM: Map[Int, Schema] = m.map(kv => kv._2 -> kv._1) - - idCache.get(idKey) match { - case Failure(_) => - idCache.put(idKey)(idM, idCacheDurationTtl) - case Success(im) => - val concatMap: Map[Int, Schema] = idM ++ im.getOrElse(Map.empty[Int, Schema]) - idCache.put(idKey)(concatMap, idCacheDurationTtl) - } - } - - val schemaKey = buildSchemaKey(s) - - schemaCache.get(schemaKey) match { - case Failure(_) => - schemaCache.put(schemaKey)(call(), schemaCacheDurationTtl) - call()(schema) - case Success(map) => map match { - case Some(m) if m.keys.exists(_ == schema) => - populateVersionCache(m) - m(schema) - case Some(m) => - val concatMaps: Map[Schema, Int] = m ++ call() - populateVersionCache(concatMaps) - schemaCache.put(schemaKey)(concatMaps, schemaCacheDurationTtl) - call()(schema) - case None => - schemaCache.put(schemaKey)(call(), schemaCacheDurationTtl) - call()(schema) - } - } - } - } - override def deleteSubject(s: String): java.util.List[Integer] = synchronized { deleteSubject(DEFAULT_REQUEST_PROPERTIES.asJava, s) } @@ -569,18 +468,34 @@ class RedisSchemaRegistryClient(restService: RestService, } } - override def register(s: String, schema: Schema, i: Int, i1: Int): Int = synchronized { + override def reset(): Unit = + throw new UnsupportedOperationException("The reset operation unsupported for a distributed cache.") + + override def parseSchema(schemaType: String, schemaString: String, + references: util.List[SchemaReference]): Optional[ParsedSchema] = { + val parsedSchema = new AvroSchemaProvider().parseSchema(schemaString, references, false) + parsedSchema + } + + override def register(subject: String, schema: ParsedSchema): Int = { + register(subject, schema, 0, -1) + } + + // scalastyle:off method.length + override def register(subject: String, parsedSchema: ParsedSchema, version: Int, id: Int): Int = synchronized { + val schema = toAvroSchema(parsedSchema) + def register(): Int = - if (i >= 0) { - restService.registerSchema(schema.toString(), s, i, i1) + if (version >= 0) { + restService.registerSchema(schema.toString(), subject, version, id) } else { - restService.registerSchema(schema.toString(), s) + restService.registerSchema(schema.toString(), subject) } - if(DO_NOT_CACHE_LIST.exists(s.startsWith)) { + if (DO_NOT_CACHE_LIST.exists(subject.startsWith)) { register() } else { - val idKey = buildIdKey(s) + val idKey = buildIdKey(subject) def populateIdCache(sc: Schema, id: Int): Unit = { idCache.caching(idKey)(idCacheDurationTtl) { @@ -594,7 +509,7 @@ class RedisSchemaRegistryClient(restService: RestService, } } - val schemaKey = buildSchemaKey(s) + val schemaKey = buildSchemaKey(subject) schemaCache.get(schemaKey) match { case Failure(_) => @@ -605,8 +520,8 @@ class RedisSchemaRegistryClient(restService: RestService, case Some(m) if m.exists(_._1 == schema) => val cachedId = m(schema) - if (i1 >= 0 && i1 != cachedId) { - throw new IllegalStateException("Schema already registered with id " + cachedId + " instead of input id " + i1) + if (id >= 0 && id != cachedId) { + throw new IllegalStateException("Schema already registered with id " + cachedId + " instead of input id " + id) } else { cachedId } @@ -625,18 +540,123 @@ class RedisSchemaRegistryClient(restService: RestService, } } - override def reset(): Unit = - println("The reset operation unsupported for a distributed cache.") + override def getSchemaById(id: Int): ParsedSchema = synchronized { + val restSchema = restService.getId(id) + val parser = new Schema.Parser() + parser.setValidateDefaults(false) + toParsedSchema(parser.parse(restSchema.getSchemaString)) + } + + override def getSchemaBySubjectAndId(subject: String, id: Int): ParsedSchema = { + val result = + if (DO_NOT_CACHE_LIST.exists(subject.startsWith)) { + restGetSchemaById(subject, id) + } else { + val idKey = buildIdKey(subject) + + idCache.get(idKey) match { + case Failure(_) => restGetId(subject, id)(id) + case Success(map) => + map match { + case Some(m) if m.keys.exists(_ == id) => m(id) + case _ => getBySubjectAndIdSynchronized(subject, id) + } + } + } + toParsedSchema(result) + } + + override def getVersion(subject: String, parsedSchema: ParsedSchema): Int = synchronized { + val schema = toAvroSchema(parsedSchema) - override def getByID(i: Int): Schema = { - getById(i) + if (DO_NOT_CACHE_LIST.exists(subject.startsWith)) { + val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), subject, true) + response.getVersion.toInt + } else { + val versionKey = buildVersionKey(subject) + + versionCache.get(versionKey) match { + case Failure(_) => + val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), subject, true) + versionCache.put(versionKey)(Map(schema -> response.getVersion.toInt), versionCacheDurationTtl) + response.getVersion.toInt + case Success(map) => + map match { + case Some(m) if m.keySet.contains(schema) => + m(schema) + case Some(m) => + val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), subject, true) + val concatMap: Map[Schema, Int] = m ++ Map(schema -> response.getVersion.toInt) + versionCache.put(versionKey)(concatMap, versionCacheDurationTtl) + response.getVersion.toInt + case None => + val response: io.confluent.kafka.schemaregistry.client.rest.entities.Schema = restService.lookUpSubjectVersion(schema.toString(), subject, true) + versionCache.put(versionKey)(Map(schema -> response.getVersion.toInt), versionCacheDurationTtl) + response.getVersion.toInt + } + } + } } - override def getBySubjectAndID(s: String, i: Int): Schema = { - getBySubjectAndId(s, i) + override def testCompatibility(subject: String, schema: ParsedSchema): Boolean = { + val compatibilityErrors = restService.testCompatibility(toAvroSchema(schema).toString(), subject, "latest") + compatibilityErrors.isEmpty } - override def testCompatibility(s: String, schema: Schema): Boolean = { - restService.testCompatibility(schema.toString(), s, "latest") + override def getId(s: String, parsedSchema: ParsedSchema): Int = synchronized { + val schema = toAvroSchema(parsedSchema) + if (DO_NOT_CACHE_LIST.exists(s.startsWith)) { + restService.lookUpSubjectVersion(schema.toString, s, false).getId.toInt + } else { + def call(): Map[Schema, Int] = { + val response = restService.lookUpSubjectVersion(schema.toString, s, false) + Map(schema -> response.getId.toInt) + } + + val idKey = buildIdKey(s) + + def populateVersionCache(m: Map[Schema, Int]): Try[Any] = { + val idM: Map[Int, Schema] = m.map(kv => kv._2 -> kv._1) + + idCache.get(idKey) match { + case Failure(_) => + idCache.put(idKey)(idM, idCacheDurationTtl) + case Success(im) => + val concatMap: Map[Int, Schema] = idM ++ im.getOrElse(Map.empty[Int, Schema]) + idCache.put(idKey)(concatMap, idCacheDurationTtl) + } + } + + val schemaKey = buildSchemaKey(s) + + schemaCache.get(schemaKey) match { + case Failure(_) => + schemaCache.put(schemaKey)(call(), schemaCacheDurationTtl) + call()(schema) + case Success(map) => map match { + case Some(m) if m.keys.exists(_ == schema) => + populateVersionCache(m) + m(schema) + case Some(m) => + val concatMaps: Map[Schema, Int] = m ++ call() + populateVersionCache(concatMaps) + schemaCache.put(schemaKey)(concatMaps, schemaCacheDurationTtl) + call()(schema) + case None => + schemaCache.put(schemaKey)(call(), schemaCacheDurationTtl) + call()(schema) + } + } + } + } + + private def toParsedSchema(avroSchema: Schema): ParsedSchema = new AvroSchema(avroSchema) + + private def toAvroSchema(parsedSchema: ParsedSchema): Schema = { + parsedSchema match { + case s: AvroSchema => s.rawSchema + case p: ParsedSchema => throw new RuntimeException(s"Non-avro schema is not allowed: [ $p ]") + case _ => throw new RuntimeException(s"Encountered unknown schema: [ $parsedSchema ]") + } } } diff --git a/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala b/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala index f2af389d6..13fe96fab 100644 --- a/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala +++ b/avro/src/main/scala/hydra/avro/registry/SchemaRegistry.scala @@ -6,11 +6,11 @@ import cats.syntax.all._ import hydra.common.config.KafkaConfigUtils._ import io.confluent.kafka.schemaregistry.avro.AvroCompatibilityChecker import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException -import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, MockSchemaRegistryClient, SchemaRegistryClient, SchemaRegistryClientConfig} +import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, MockSchemaRegistryClient, SchemaRegistryClient} import org.apache.avro.{LogicalType, LogicalTypes, Schema} import org.typelevel.log4cats.Logger -import retry.syntax.all._ import retry.RetryPolicies._ +import retry.syntax.all._ import retry.{RetryDetails, Sleep} import scala.collection.JavaConverters._ @@ -18,90 +18,100 @@ import scala.concurrent.duration._ import scala.util.Try /** - * Internal interface to interact with the SchemaRegistryClient from Confluent. - * Abstraction allows pure functional interface for working with underlying Java implementation. - * Provides a live version for production usage and a test version for integration testing. - * @tparam F - higher kinded type - polymorphic effect type - */ + * Internal interface to interact with the SchemaRegistryClient from Confluent. + * Abstraction allows pure functional interface for working with underlying Java implementation. + * Provides a live version for production usage and a test version for integration testing. + * + * @tparam F - higher kinded type - polymorphic effect type + */ trait SchemaRegistry[F[_]] { import SchemaRegistry._ /** - * Adds schema to the configured SchemaRegistry. Registration is idempotent. - * Equivalency is determined by taking a hash of the given schema. Any changes to the schema change the hash. - * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) - * @param schema - avro Schema which is to be added to the Schema Registry - * @return SchemaId for schema, whether newly created or preexisting - */ + * Adds schema to the configured SchemaRegistry. Registration is idempotent. + * Equivalency is determined by taking a hash of the given schema. Any changes to the schema change the hash. + * + * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) + * @param schema - avro Schema which is to be added to the Schema Registry + * @return SchemaId for schema, whether newly created or preexisting + */ def registerSchema(subject: String, schema: Schema): F[SchemaId] /** - * Deletes schema from the configured SchemaRegistry. Deletes only the version specified and only one of the - * key /value, whichever was specified in the subject suffix. - * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) - * @param version - integer representing the schema version - * @return Unit - */ + * Deletes schema from the configured SchemaRegistry. Deletes only the version specified and only one of the + * key /value, whichever was specified in the subject suffix. + * + * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) + * @param version - integer representing the schema version + * @return Unit + */ def deleteSchemaOfVersion(subject: String, version: SchemaVersion): F[Unit] /** - * Deletes the subject from the versionCache, idCache, and schemaCache - * of the CachedSchemaRegistryClient - * @param subject The subject using -key or -value to delete - * @return Unit - */ + * Deletes the subject from the versionCache, idCache, and schemaCache + * of the CachedSchemaRegistryClient + * + * @param subject The subject using -key or -value to delete + * @return Unit + */ def deleteSchemaSubject(subject: String): F[Unit] /** - * Retrieves the SchemaVersion if the given subject and schema match an item in SchemaRegistry. - * The schema hash must exactly match one of the schemas stored in Schema Registry. All fields must be equal. - * If the schema is not found, the error will be reported in the error channel of the higher kinded type (F[_]). - * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) - * @param schema - avro Schema which is expected to be in Schema Registry - * @return SchemaVersion - */ + * Retrieves the SchemaVersion if the given subject and schema match an item in SchemaRegistry. + * The schema hash must exactly match one of the schemas stored in Schema Registry. All fields must be equal. + * If the schema is not found, the error will be reported in the error channel of the higher kinded type (F[_]). + * + * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) + * @param schema - avro Schema which is expected to be in Schema Registry + * @return SchemaVersion + */ def getVersion(subject: String, schema: Schema): F[SchemaVersion] /** - * Retrieves all SchemaVersion(s) for a given subject. - * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) - * @return List[SchemaVersion] or List.empty if Subject Not Found - */ + * Retrieves all SchemaVersion(s) for a given subject. + * + * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) + * @return List[SchemaVersion] or List.empty if Subject Not Found + */ def getAllVersions(subject: String): F[List[SchemaVersion]] /** - * Retrieves all subjects found in the SchemaRegistry - * @return List[String] - */ + * Retrieves all subjects found in the SchemaRegistry + * + * @return List[String] + */ def getAllSubjects: F[List[String]] /** - * Retrieves the SchemaRegistryClient from the algebra - * @return SchemaRegistryClient - */ + * Retrieves the SchemaRegistryClient from the algebra + * + * @return SchemaRegistryClient + */ def getSchemaRegistryClient: F[SchemaRegistryClient] /** - * Retrieves the latest schema for the given subject name, if exists - * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) - * @return - Optional Schema for the given subject name - */ + * Retrieves the latest schema for the given subject name, if exists + * + * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) + * @return - Optional Schema for the given subject name + */ def getLatestSchemaBySubject(subject: String): F[Option[Schema]] /** - * Retrieves schema for the version and subject specified, if exists - * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) - * @param schemaVersion - version number for the schema - * @return Optional Schema for the given subject and version combination - */ + * Retrieves schema for the version and subject specified, if exists + * + * @param subject - subject name for the schema found in SchemaRegistry including the suffix (-key | -value) + * @param schemaVersion - version number for the schema + * @return Optional Schema for the given subject and version combination + */ def getSchemaFor(subject: String, schemaVersion: SchemaVersion): F[Option[Schema]] } object SchemaRegistry { - private[registry] implicit class CheckKeySchemaEvolution[F[_]: Sync](schemasF: F[List[Schema]]) { + private[registry] implicit class CheckKeySchemaEvolution[F[_] : Sync](schemasF: F[List[Schema]]) { def checkKeyEvolution(subject: String, newSchema: Schema): F[List[Schema]] = schemasF.flatTap[Unit] { case _ if subject.endsWith("-value") => Sync[F].unit case Nil => Sync[F].unit @@ -118,6 +128,7 @@ object SchemaRegistry { RuntimeException(message) final case class LogicalTypeBaseTypeMismatch(baseType: Schema.Type, logicalType: LogicalType, fieldName: String) + final case class LogicalTypeBaseTypeMismatchErrors(errors: List[LogicalTypeBaseTypeMismatch]) extends RuntimeException( errors.map(e => s"Field named '${e.fieldName}' contains mismatch in " + @@ -125,11 +136,12 @@ object SchemaRegistry { ) final case class IllegalLogicalTypeChange(originalType: LogicalType, proposedType: LogicalType, fieldName: String) + final case class IllegalLogicalTypeChangeErrors(errors: List[IllegalLogicalTypeChange]) extends RuntimeException( errors.map(e => s"Changing logical types is not allowed. Field named '${e.fieldName}'s logical type cannot be changed from " + - s"logicalType of '${if(e.originalType != null) e.originalType.getName else null}' to logicalType of '${if(e.proposedType != null) e.proposedType.getName else null}'").mkString("\n") + s"logicalType of '${if (e.originalType != null) e.originalType.getName else null}' to logicalType of '${if (e.proposedType != null) e.proposedType.getName else null}'").mkString("\n") ) type SchemaId = Int @@ -140,28 +152,30 @@ object SchemaRegistry { AvroCompatibilityChecker.FULL_TRANSITIVE_CHECKER.isCompatible(newSchema, oldSchemas.asJava) } - def live[F[_]: Sync: Logger: Sleep]( - schemaRegistryBaseUrl: String, - maxCacheSize: Int, - securityConfig: SchemaRegistrySecurityConfig, - schemaRegistryClientRetries: Int, - schemaRegistryClientRetriesDelay: FiniteDuration - ): F[SchemaRegistry[F]] = Sync[F].delay { - getFromSchemaRegistryClient(new CachedSchemaRegistryClient(schemaRegistryBaseUrl, maxCacheSize, securityConfig.toConfigMap.asJava), schemaRegistryClientRetries, schemaRegistryClientRetriesDelay) + def live[F[_] : Sync : Logger : Sleep]( + schemaRegistryBaseUrl: String, + maxCacheSize: Int, + securityConfig: SchemaRegistrySecurityConfig, + schemaRegistryClientRetries: Int, + schemaRegistryClientRetriesDelay: FiniteDuration + ): F[SchemaRegistry[F]] = Sync[F].delay { + getFromSchemaRegistryClient(new CachedSchemaRegistryClient(schemaRegistryBaseUrl, maxCacheSize, + securityConfig.toConfigMap.asJava), schemaRegistryClientRetries, schemaRegistryClientRetriesDelay) } - def live[F[_] : Sync: Logger: Sleep]( - schemaRegistryBaseUrl: String, - securityConfig: SchemaRegistrySecurityConfig, - redisUrl: String, - redisPort: Int, - ssl: Boolean, - idCacheTtl: Int, - schemaCacheTtl: Int, - versionCacheTtl: Int, - schemaRegistryClientRetries: Int, - schemaRegistryClientRetriesDelay: FiniteDuration - ): F[SchemaRegistry[F]] = Sync[F].delay { + // scalastyle:off parameter.number + def live[F[_] : Sync : Logger : Sleep]( + schemaRegistryBaseUrl: String, + securityConfig: SchemaRegistrySecurityConfig, + redisUrl: String, + redisPort: Int, + ssl: Boolean, + idCacheTtl: Int, + schemaCacheTtl: Int, + versionCacheTtl: Int, + schemaRegistryClientRetries: Int, + schemaRegistryClientRetriesDelay: FiniteDuration + ): F[SchemaRegistry[F]] = Sync[F].delay { getFromSchemaRegistryClient( new RedisSchemaRegistryClient( schemaRegistryBaseUrl, @@ -176,20 +190,25 @@ object SchemaRegistry { ) } - def test[F[_]: Sync: Logger: Sleep]: F[SchemaRegistry[F]] = Sync[F].delay { + def test[F[_] : Sync : Logger : Sleep]: F[SchemaRegistry[F]] = Sync[F].delay { getFromSchemaRegistryClient(new MockSchemaRegistryClient, schemaRegistryClientRetries = 0, schemaRegistryClientRetriesDelay = 1.milliseconds) } - def test[F[_]: Sync: Logger: Sleep](mockedClient: SchemaRegistryClient, schemaRegistryClientRetries: Int = 3, schemaRegistryClientRetriesDelay: FiniteDuration = 500.milliseconds): F[SchemaRegistry[F]] = Sync[F].delay { + def test[F[_] : Sync : Logger : Sleep](mockedClient: SchemaRegistryClient, schemaRegistryClientRetries: Int = 3, + schemaRegistryClientRetriesDelay: FiniteDuration = 500.milliseconds): F[SchemaRegistry[F]] = Sync[F].delay { getFromSchemaRegistryClient(mockedClient, schemaRegistryClientRetries, schemaRegistryClientRetriesDelay) } - private def getFromSchemaRegistryClient[F[_]: Sync: Logger: Sleep](schemaRegistryClient: SchemaRegistryClient, schemaRegistryClientRetries: Int, schemaRegistryClientRetriesDelay: FiniteDuration): SchemaRegistry[F] = + // scalastyle:off method.length + private def getFromSchemaRegistryClient[F[_] : Sync : Logger : Sleep](schemaRegistryClient: SchemaRegistryClient, + schemaRegistryClientRetries: Int, + schemaRegistryClientRetriesDelay: FiniteDuration): SchemaRegistry[F] = new SchemaRegistry[F] { val retryPolicy = limitRetries(schemaRegistryClientRetries) |+| constantDelay[F](schemaRegistryClientRetriesDelay) private implicit class SchemaOps(sch: Schema) { def fields: List[Schema.Field] = fieldsEval("topLevel", box = false).value + private[SchemaOps] def fieldsEval(fieldName: String, box: Boolean = false): Eval[List[Schema.Field]] = sch.getType match { case Schema.Type.RECORD => Eval.defer(sch.getFields.asScala.toList.flatTraverse(nf => nf.schema.fieldsEval(nf.name, box = true))) case Schema.Type.UNION => Eval.defer(sch.getTypes.asScala.toList.flatTraverse(_.fieldsEval(fieldName, box = true))) @@ -202,7 +221,7 @@ object SchemaRegistry { private def checkTypesMatch(f: Schema.Field, expected: Schema.Type, logicalType: LogicalType): List[LogicalTypeBaseTypeMismatch] = { if (f.schema.getType == expected) { - List.empty + List.empty } else { List(LogicalTypeBaseTypeMismatch(f.schema.getType, logicalType, f.name)) } @@ -225,9 +244,9 @@ object SchemaRegistry { } override def registerSchema( - subject: String, - schema: Schema - ): F[SchemaId] = { + subject: String, + schema: Schema + ): F[SchemaId] = { for { versions <- getAllVersions(subject) schemas <- versions.traverse(getSchemaFor(subject, _)).map(_.flatten).checkKeyEvolution(subject, schema) @@ -235,25 +254,27 @@ object SchemaRegistry { _ <- checkLogicalTypesCompat(schema) latest <- getLatestSchemaBySubject(subject) schemaVersion <- if (validated) { - if(checkForOnlyDocFieldUpdate(schema, latest)) { + if (checkForOnlyDocFieldUpdate(schema, latest)) { schemaRegistryClient.reset() } Sync[F].delay(schemaRegistryClient.register(subject, schema)) } else { - Sync[F].raiseError[SchemaVersion](IncompatibleSchemaException("Incompatible Schema Evolution. You may add fields with default fields, or remove fields with default fields.")) + Sync[F].raiseError[SchemaVersion]( + IncompatibleSchemaException(s"Incompatible Schema Evolution. You may add fields with default fields, or " + + s"remove fields with default fields. Subject: $subject and schema is : $schema")) } } yield schemaVersion } def checkForOnlyDocFieldUpdate(schema: Schema, latest: Option[Schema]): Boolean = { - if(latest.isEmpty) { + if (latest.isEmpty) { false } else { val realLatest = latest.get if (schema != null && schema.equals(realLatest)) { - val fieldDoc = schema.getFields.asScala.toList.map {field => field.doc()} - val latestDoc = realLatest.getFields.asScala.toList.map {field => field.doc()} - if(fieldDoc.diff(latestDoc).nonEmpty) { + val fieldDoc = schema.getFields.asScala.toList.map { field => field.doc() } + val latestDoc = realLatest.getFields.asScala.toList.map { field => field.doc() } + if (fieldDoc.diff(latestDoc).nonEmpty) { return true } } @@ -262,17 +283,17 @@ object SchemaRegistry { } override def deleteSchemaOfVersion( - subject: String, - version: SchemaVersion - ): F[Unit] = + subject: String, + version: SchemaVersion + ): F[Unit] = Sync[F].delay( schemaRegistryClient.deleteSchemaVersion(subject, version.toString) ) override def getVersion( - subject: String, - schema: Schema - ): F[SchemaVersion] = + subject: String, + schema: Schema + ): F[SchemaVersion] = Sync[F].delay { schemaRegistryClient.getVersion(subject, schema) }.retryingOnAllErrors(retryPolicy, onFailure("getVersion")) diff --git a/avro/src/test/scala/hydra/avro/registry/ConfluentSchemaRegistrySpec.scala b/avro/src/test/scala/hydra/avro/registry/ConfluentSchemaRegistrySpec.scala index 05b2ba9c7..76475351e 100644 --- a/avro/src/test/scala/hydra/avro/registry/ConfluentSchemaRegistrySpec.scala +++ b/avro/src/test/scala/hydra/avro/registry/ConfluentSchemaRegistrySpec.scala @@ -17,6 +17,7 @@ package hydra.avro.registry import com.typesafe.config.ConfigFactory import hydra.common.config.KafkaConfigUtils.SchemaRegistrySecurityConfig +import io.confluent.kafka.schemaregistry.avro.AvroSchema import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, MockSchemaRegistryClient} import org.apache.avro.Schema import org.scalatest.concurrent.ScalaFutures @@ -52,7 +53,7 @@ class ConfluentSchemaRegistrySpec override def beforeAll(): Unit = { id = - ConfluentSchemaRegistry.mockRegistry.register(schema.getFullName, schema) + ConfluentSchemaRegistry.mockRegistry.register(schema.getFullName, new AvroSchema(schema)) } describe("When creating a schema registry client") { diff --git a/avro/src/test/scala/hydra/avro/registry/RedisSchemaRegistryClientSpec.scala b/avro/src/test/scala/hydra/avro/registry/RedisSchemaRegistryClientSpec.scala index 733a952a2..a50bacdf5 100644 --- a/avro/src/test/scala/hydra/avro/registry/RedisSchemaRegistryClientSpec.scala +++ b/avro/src/test/scala/hydra/avro/registry/RedisSchemaRegistryClientSpec.scala @@ -4,9 +4,10 @@ import org.apache.avro.{Schema, SchemaBuilder} import org.scalatest.BeforeAndAfterAll import com.github.sebruck.EmbeddedRedis import hydra.avro.registry.RedisSchemaRegistryClient.IntSchemaMetadataMapBinCodec +import io.confluent.kafka.schemaregistry.avro.AvroSchema import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaMetadata, SchemaRegistryClient} -import net.manub.embeddedkafka.schemaregistry.{EmbeddedKafka, EmbeddedKafkaConfig} +import io.github.embeddedkafka.schemaregistry.{EmbeddedKafka, EmbeddedKafkaConfig} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import redis.embedded.RedisServer @@ -60,15 +61,16 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with val schema1: Schema = SchemaBuilder.record("Test1").fields() .requiredString("testing1").endRecord() - val schema12: Schema = SchemaBuilder.record("Test12").fields() - .requiredString("testing1").endRecord() + val schema12: Schema = SchemaBuilder.record("Test1").fields() + .requiredString("testing1").optionalString("testing22").endRecord() val schema2: Schema = SchemaBuilder.record("Test2").fields() .requiredString("testing2").endRecord() //register two schemas with different clients - val redisClientResult = redisClient.register(topicName1, schema1) - val cachedClientResult = cachedClient.register(topicName2 ,schema2) + val redisClientResult = redisClient.register(topicName1, new AvroSchema(schema1)) + val cachedClientResult = cachedClient.register(topicName2 , new AvroSchema(schema2)) + //test the getAllSubjectsById method assert(redisClient.getAllSubjectsById(redisClientResult).contains(topicName1)) @@ -77,17 +79,17 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with assert(cachedClient.getAllSubjectsById(cachedClientResult).contains(topicName2)) //test the getById method - redisClient.getById(redisClientResult) shouldBe schema1 - cachedClient.getById(redisClientResult) shouldBe schema1 - redisClient.getById(cachedClientResult) shouldBe schema2 - cachedClient.getById(cachedClientResult) shouldBe schema2 + redisClient.getSchemaById(redisClientResult) shouldBe new AvroSchema(schema1) + cachedClient.getSchemaById(redisClientResult) shouldBe new AvroSchema(schema1) + redisClient.getSchemaById(cachedClientResult) shouldBe new AvroSchema(schema2) + cachedClient.getSchemaById(cachedClientResult) shouldBe new AvroSchema(schema2) //test the getBySubjectAndId method Thread.sleep(3000) - assert(redisClient.getBySubjectAndId(topicName1, redisClientResult).equals(schema1)) - assert(cachedClient.getBySubjectAndId(topicName1, redisClientResult).equals(schema1)) - assert(redisClient.getBySubjectAndId(topicName2, cachedClientResult).equals(schema2)) - assert(cachedClient.getBySubjectAndId(topicName2, cachedClientResult).equals(schema2)) + assert(redisClient.getSchemaBySubjectAndId(topicName1, redisClientResult).equals(new AvroSchema(schema1))) + assert(cachedClient.getSchemaBySubjectAndId(topicName1, redisClientResult).equals(new AvroSchema(schema1))) + assert(redisClient.getSchemaBySubjectAndId(topicName2, cachedClientResult).equals(new AvroSchema(schema2))) + assert(cachedClient.getSchemaBySubjectAndId(topicName2, cachedClientResult).equals(new AvroSchema(schema2))) //test the getAllSubjects method @@ -109,10 +111,10 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with //test the getId method Thread.sleep(3000) - redisClient.getId(topicName1, schema1) shouldBe redisClientResult - cachedClient.getId(topicName1, schema1) shouldBe redisClientResult - redisClient.getId(topicName2, schema2) shouldBe cachedClientResult - cachedClient.getId(topicName2, schema2) shouldBe cachedClientResult + redisClient.getId(topicName1, new AvroSchema(schema1)) shouldBe redisClientResult + cachedClient.getId(topicName1, new AvroSchema(schema1)) shouldBe redisClientResult + redisClient.getId(topicName2, new AvroSchema(schema2)) shouldBe cachedClientResult + cachedClient.getId(topicName2, new AvroSchema(schema2)) shouldBe cachedClientResult //test the getLatestSchemaMetadata method val schemaMetadata1 = new SchemaMetadata(1, 1,schema1.toString) @@ -123,13 +125,13 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with schemaMetadata1Result.getVersion shouldBe schemaMetadata1.getVersion schemaMetadata1Result.getSchema shouldBe schemaMetadata1.getSchema - redisClient.register(topicName1, schema12) + redisClient.register(topicName1, new AvroSchema(schema12)) val schemaMetadata2Result = redisClient.getLatestSchemaMetadata(topicName1) schemaMetadata2Result.getId shouldBe schemaMetadata2.getId schemaMetadata2Result.getVersion shouldBe schemaMetadata2.getVersion schemaMetadata2Result.getSchema shouldBe schemaMetadata2.getSchema - //test the getSchemaMetadata method + //test the getSchemaMetadata method val metadata1 = redisClient.getSchemaMetadata(topicName1, 1) metadata1.getId shouldBe schemaMetadata1.getId metadata1.getVersion shouldBe schemaMetadata1.getVersion @@ -143,9 +145,9 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with //test the getVersion method Thread.sleep(3000) - redisClient.getVersion(topicName1, schema1) shouldBe 1 - redisClient.getVersion(topicName1, schema12) shouldBe 2 - redisClient.getVersion(topicName2, schema2) shouldBe 1 + redisClient.getVersion(topicName1, new AvroSchema(schema1)) shouldBe 1 + redisClient.getVersion(topicName1, new AvroSchema(schema12)) shouldBe 2 + redisClient.getVersion(topicName2, new AvroSchema(schema2)) shouldBe 1 //test the deleteSchemaVersion method //the latest metadata is in this test -> test the getLatestSchemaMetadata method @@ -160,7 +162,7 @@ class RedisSchemaRegistryClientSpec extends AnyFlatSpec with EmbeddedRedis with intercept[RestClientException] { redisClient.getLatestSchemaMetadata(topicName2) - }.getMessage shouldBe "Subject not found.; error code: 40401" + }.getMessage shouldBe "Subject 'topicB' not found.; error code: 40401" succeed } diff --git a/avro/src/test/scala/hydra/avro/registry/SchemaRegistrySpec.scala b/avro/src/test/scala/hydra/avro/registry/SchemaRegistrySpec.scala index 820574613..009027fa1 100644 --- a/avro/src/test/scala/hydra/avro/registry/SchemaRegistrySpec.scala +++ b/avro/src/test/scala/hydra/avro/registry/SchemaRegistrySpec.scala @@ -392,12 +392,12 @@ class SchemaRegistrySpec extends AnyFlatSpecLike with MockFactory with Matchers it must "do retries for getAllVersions when SchemaRegistry throws error" in { val expectedVersions = List(1, 2, 3) val mockSchemaRegistryClient = mock[SchemaRegistryClient] - (mockSchemaRegistryClient.getAllVersions _) + (mockSchemaRegistryClient.getAllVersions(_ : String)) .expects(*) .throws(new RestClientException("error", 0, 50005)) .repeat(2) - (mockSchemaRegistryClient.getAllVersions _) + (mockSchemaRegistryClient.getAllVersions(_ : String)) .expects(*) .returns(expectedVersions.map(Integer.valueOf).asJava) @@ -426,7 +426,7 @@ class SchemaRegistrySpec extends AnyFlatSpecLike with MockFactory with Matchers it must "fail if all attempts were used" in { val mockSchemaRegistryClient = mock[SchemaRegistryClient] - (mockSchemaRegistryClient.getAllVersions _) + (mockSchemaRegistryClient.getAllVersions(_ : String)) .expects(*) .throws(new RestClientException("error", 0, 50005)) .repeat(3) diff --git a/avro/src/test/scala/hydra/avro/resource/SchemaResourceLoaderSpec.scala b/avro/src/test/scala/hydra/avro/resource/SchemaResourceLoaderSpec.scala index f32404b14..f9ed3dc92 100644 --- a/avro/src/test/scala/hydra/avro/resource/SchemaResourceLoaderSpec.scala +++ b/avro/src/test/scala/hydra/avro/resource/SchemaResourceLoaderSpec.scala @@ -16,6 +16,7 @@ package hydra.avro.resource import hydra.avro.registry.SchemaRegistryException +import io.confluent.kafka.schemaregistry.avro.AvroSchema import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient import org.apache.avro.{Schema, SchemaBuilder} import org.scalatest.BeforeAndAfterEach @@ -208,7 +209,7 @@ class SchemaResourceLoaderSpec "returns the same underlying key schema instance if the registry metadata hasn't changed" ) { val client = new MockSchemaRegistryClient - client.register(testKeySchema.getFullName + "-key", testKeySchema) + client.register(testKeySchema.getFullName + "-key", new AvroSchema(testKeySchema)) val loader = new SchemaResourceLoader( "http://localhost:48223", client, @@ -231,7 +232,7 @@ class SchemaResourceLoaderSpec ) { val nschema = newValueSchema(testValueSchema.getNamespace, "ntest") val client = new MockSchemaRegistryClient - client.register(nschema.getFullName + "-value", nschema) + client.register(nschema.getFullName + "-value", new AvroSchema(nschema)) val loader = new SchemaResourceLoader( "http://localhost:48223", client, @@ -258,7 +259,7 @@ class SchemaResourceLoaderSpec false, fields.asJava ) - client.register(nschema.getFullName + "-value", evolvedSchema) + client.register(nschema.getFullName + "-value", new AvroSchema(evolvedSchema)) eventually { whenReady(loader.retrieveValueSchema(nschema.getFullName)) { schemaResource => (schemaResource.schema eq nschema) shouldBe false @@ -271,7 +272,7 @@ class SchemaResourceLoaderSpec ) { val nschema = newKeySchema(testKeySchema.getNamespace, "ntest") val client = new MockSchemaRegistryClient - client.register(nschema.getFullName + "-value", nschema) + client.register(nschema.getFullName + "-value", new AvroSchema(nschema)) val loader = new SchemaResourceLoader( "http://localhost:48223", client, @@ -300,7 +301,7 @@ class SchemaResourceLoaderSpec false, fields.asJava ) - client.register(nschema.getFullName + "-value", evolvedSchema) + client.register(nschema.getFullName + "-value", new AvroSchema(evolvedSchema)) eventually { whenReady(loader.retrieveValueSchema(nschema.getFullName)) { schemaResource => (schemaResource.schema eq nschema) shouldBe false diff --git a/build.sbt b/build.sbt index 2c52b32b1..44943fa47 100644 --- a/build.sbt +++ b/build.sbt @@ -16,11 +16,15 @@ lazy val defaultSettings = Seq( excludeDependencies += "org.slf4j" % "slf4j-log4j12", excludeDependencies += "log4j" % "log4j", dependencyOverrides ++= Seq( - "org.apache.commons" % "commons-lang3" % "3.12.0", - "org.apache.commons" % "commons-compress" % "1.22", - "org.apache.commons" % "lang3" % "3.1.0", - "io.confluent" % "kafka-schema-registry" % "5.4.2" % "test", - "io.confluent" % "kafka-avro-serializer" % "5.4.2" % "test" + "org.apache.commons" % "commons-compress" % "1.24.0", + "io.netty" % "netty-codec" % "4.1.77.Final", +// "org.apache.zookeeper" % "zookeeper" % "3.7.2", -- snyk vulnerability fix + "org.xerial.snappy" % "snappy-java" % "1.1.10.4", + "org.apache.avro" % "avro" % Dependencies.avroVersion, + "com.fasterxml.jackson.core" % "jackson-databind" % "2.13.3", + "org.apache.kafka" %% "kafka" % "2.8.2" % "test", + "io.confluent" %% "kafka-schema-registry" % "6.2.1" % "test", + "io.confluent" %% "kafka-avro-serializer" % "6.2.1" % "test", ), addCompilerPlugin( "org.typelevel" %% "kind-projector" % "0.11.3" cross CrossVersion.full @@ -109,10 +113,10 @@ lazy val core = Project( .settings( moduleSettings, name := "hydra-core", - libraryDependencies ++= Dependencies.coreDeps ++ Dependencies.awsAuthDeps, + libraryDependencies ++= Dependencies.coreDeps ++ Dependencies.awsAuthDeps ++ Dependencies.kafkaSchemaRegistryDep, dependencyOverrides ++= Seq( - "io.confluent" % "kafka-schema-registry" % "5.4.2", - "io.confluent" % "kafka-avro-serializer" % "5.4.2" + "org.apache.kafka" %% "kafka" % "2.8.2", + "org.apache.kafka" % "kafka-clients" % "2.8.2" ) ) @@ -126,8 +130,8 @@ lazy val kafka = Project( name := "hydra-kafka", libraryDependencies ++= Dependencies.kafkaDeps, dependencyOverrides ++= Seq( - "io.confluent" % "kafka-schema-registry" % "5.4.2", - "io.confluent" % "kafka-avro-serializer" % "5.4.2" + "org.apache.kafka" %% "kafka" % "2.8.2", + "org.apache.kafka" % "kafka-clients" % "2.8.2" ) ) @@ -141,9 +145,6 @@ lazy val avro = Project( libraryDependencies ++= Dependencies.avroDeps ) -val sbSettings = - defaultSettings ++ Test.testSettings ++ noPublishSettings ++ restartSettings - lazy val ingest = Project( id = "ingest", base = file("ingest") diff --git a/common/src/main/scala/hydra/common/util/ClassUtils.scala b/common/src/main/scala/hydra/common/util/ClassUtils.scala new file mode 100644 index 000000000..9a5b1197a --- /dev/null +++ b/common/src/main/scala/hydra/common/util/ClassUtils.scala @@ -0,0 +1,9 @@ +package hydra.common.util + +object ClassUtils { + + // Null safe access to class.getSimpleName + def getSimpleName(cls: Class[_]): String = { + if (cls == null) "" else cls.getSimpleName + } +} diff --git a/core/src/main/scala/hydra/core/akka/SchemaRegistryActor.scala b/core/src/main/scala/hydra/core/akka/SchemaRegistryActor.scala index a48acb758..f66b4d06a 100644 --- a/core/src/main/scala/hydra/core/akka/SchemaRegistryActor.scala +++ b/core/src/main/scala/hydra/core/akka/SchemaRegistryActor.scala @@ -16,7 +16,7 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientExcept import org.apache.avro.{Schema, SchemaParseException} import org.apache.kafka.common.PartitionInfo import scalacache.cachingF - +import io.confluent.kafka.schemaregistry.avro.AvroSchema import scala.collection.JavaConverters._ import scala.collection.immutable.Map import scala.concurrent.Future @@ -173,8 +173,8 @@ class SchemaRegistryActor( val subject = getSubject(schema) log.debug(s"Registering schema ${schema.getFullName}: $json") - val schemaId = registry.registryClient.register(subject, schema) - val version = registry.registryClient.getVersion(subject, schema) + val schemaId = registry.registryClient.register(subject, new AvroSchema(schema)) + val version = registry.registryClient.getVersion(subject, new AvroSchema(schema)) RegisterSchemaResponse(SchemaResource(schemaId, version, schema)) } } diff --git a/core/src/main/scala/hydra/core/ingest/Ingestor.scala b/core/src/main/scala/hydra/core/ingest/Ingestor.scala index aa4f06d8c..84d8586bb 100644 --- a/core/src/main/scala/hydra/core/ingest/Ingestor.scala +++ b/core/src/main/scala/hydra/core/ingest/Ingestor.scala @@ -6,7 +6,7 @@ import hydra.core.akka.InitializingActor import hydra.core.monitor.HydraMetrics import hydra.core.protocol._ import hydra.core.transport.{AckStrategy, HydraRecord, RecordFactory} -import org.apache.commons.lang3.ClassUtils +import hydra.common.util.ClassUtils import scala.concurrent.Future import scala.util.{Success, Try} diff --git a/core/src/test/scala/hydra/core/akka/SchemaRegistryActorSpec.scala b/core/src/test/scala/hydra/core/akka/SchemaRegistryActorSpec.scala index a82ef6a28..afe3286b5 100644 --- a/core/src/test/scala/hydra/core/akka/SchemaRegistryActorSpec.scala +++ b/core/src/test/scala/hydra/core/akka/SchemaRegistryActorSpec.scala @@ -11,6 +11,7 @@ import hydra.avro.resource.SchemaResource import hydra.common.config.KafkaConfigUtils.SchemaRegistrySecurityConfig import hydra.core.akka.SchemaRegistryActor._ import hydra.core.protocol.HydraApplicationError +import io.confluent.kafka.schemaregistry.avro.AvroSchema import org.apache.avro.Schema.Parser import org.apache.avro.SchemaBuilder import org.scalatest.BeforeAndAfterAll @@ -58,9 +59,9 @@ class SchemaRegistryActorSpec .endRecord override def beforeAll() = { - client.register("hydra.test.Tester-value", testSchema) - client.register("hydra.test.TesterWithKey-key", testKeySchema) - client.register("hydra.test.TesterWithKey-value", testSchema) + client.register("hydra.test.Tester-value", new AvroSchema(testSchema)) + client.register("hydra.test.TesterWithKey-key", new AvroSchema(testKeySchema)) + client.register("hydra.test.TesterWithKey-value", new AvroSchema(testSchema)) } val listener = TestProbe() diff --git a/helm/eks-dev-values.yml b/helm/eks-dev-values.yml index df0cdb763..1b8771347 100644 --- a/helm/eks-dev-values.yml +++ b/helm/eks-dev-values.yml @@ -16,7 +16,7 @@ replicas: 2 #velero.io/exclude-from-backup" = true service: type: ClusterIP - port: 8080 + port: 8088 annotations: {} labels: {} @@ -37,25 +37,25 @@ ingress: resources: limits: - memory: 6Gi + memory: 7Gi requests: cpu: 250m - memory: 1Gi + memory: 2Gi tolerations: [] nodeSelector: {} config: - LOG_DIR: /var/log/hydra/ - LOG_LEVEL: INFO - HYDRA_KAFKA_PRODUCER_BOOTSTRAP_SERVERS: b-1.isdvsdevblueuswest2.8xkirx.c14.kafka.us-west-2.amazonaws.com:9098,b-3.isdvsdevblueuswest2.8xkirx.c14.kafka.us-west-2.amazonaws.com:9098,b-2.isdvsdevblueuswest2.8xkirx.c14.kafka.us-west-2.amazonaws.com:9098 + HYDRA_KAFKA_PRODUCER_BOOTSTRAP_SERVERS: b-3.isdvsdevblueuswest2.8xkirx.c14.kafka.us-west-2.amazonaws.com:9092,b-2.isdvsdevblueuswest2.8xkirx.c14.kafka.us-west-2.amazonaws.com:9092,b-1.isdvsdevblueuswest2.8xkirx.c14.kafka.us-west-2.amazonaws.com:9092 + HYDRA_KAFKA_SECURITY_PROTOCOL: 'PLAINTEXT' + HYDRA_MIN_INSYNC_REPLICAS: 1 + HYDRA_REPLICATION_FACTOR: 1 HYDRA_SCHEMA_REGISTRY_URL: https://dvs-dev-schema-registry.eplur-staging.vnerd.com:8081 HYDRA_V2_METADATA_CONTACT: '#dev-data-platform' - HYDRA_REPLICATION_FACTOR: 1 - HYDRA_V2_METADATA_CONSUMER_GROUP: 'v2MetadataConsumer' + HYDRA_V2_METADATA_CONSUMER_GROUP: 'dev-cluster-v2MetadataConsumer' HYDRA_V2_CREATE_TOPICS_ENABLED: 'true' HYDRA_V2_METADATA_CREATE_ON_STARTUP: 'true' - HYDRA_MIN_INSYNC_REPLICAS: 1 - + LOG_DIR: /var/log/hydra/ + LOG_LEVEL: INFO diff --git a/ingest/src/test/scala/hydra/ingest/http/SchemasEndpointSpec.scala b/ingest/src/test/scala/hydra/ingest/http/SchemasEndpointSpec.scala index 6ff9b68d1..4d113b1c4 100644 --- a/ingest/src/test/scala/hydra/ingest/http/SchemasEndpointSpec.scala +++ b/ingest/src/test/scala/hydra/ingest/http/SchemasEndpointSpec.scala @@ -23,7 +23,7 @@ import org.apache.kafka.common.{Node, PartitionInfo} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike import spray.json.{JsArray, JsObject, JsValue, RootJsonFormat} -import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import scala.collection.immutable.Map import scala.concurrent.duration._ diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/KafkaAdminAlgebra.scala b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/KafkaAdminAlgebra.scala index 20e623f18..a1d9f1402 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/algebras/KafkaAdminAlgebra.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/algebras/KafkaAdminAlgebra.scala @@ -14,7 +14,6 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException import org.typelevel.log4cats.Logger import hydra.common.config.KafkaConfigUtils._ import kafka.server.KafkaConfig - /** * Internal interface to interact with the KafkaAdminClient from FS2 Kafka. * Provides a live version for production usage and a test version for integration testing. diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/producer/AvroRecord.scala b/ingestors/kafka/src/main/scala/hydra/kafka/producer/AvroRecord.scala index 9b45f1635..c9be1bfac 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/producer/AvroRecord.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/producer/AvroRecord.scala @@ -4,7 +4,7 @@ import com.pluralsight.hydra.avro.JsonConverter import hydra.core.transport.AckStrategy import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord -import org.apache.commons.lang3.StringUtils + /** * Created by alexsilva on 10/30/15. diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/producer/JsonRecord.scala b/ingestors/kafka/src/main/scala/hydra/kafka/producer/JsonRecord.scala index e1da5603a..6a3105fd5 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/producer/JsonRecord.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/producer/JsonRecord.scala @@ -18,7 +18,6 @@ package hydra.kafka.producer import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import hydra.core.transport.AckStrategy -import org.apache.commons.lang3.StringUtils /** * Created by alexsilva on 11/30/15. diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/producer/KafkaRecord.scala b/ingestors/kafka/src/main/scala/hydra/kafka/producer/KafkaRecord.scala index b25a08324..ab1963b2a 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/producer/KafkaRecord.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/producer/KafkaRecord.scala @@ -1,7 +1,7 @@ package hydra.kafka.producer import hydra.core.transport.HydraRecord -import org.apache.commons.lang3.ClassUtils +import hydra.common.util.ClassUtils import org.apache.kafka.clients.producer.ProducerRecord /** diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/producer/StringRecord.scala b/ingestors/kafka/src/main/scala/hydra/kafka/producer/StringRecord.scala index dbb75cfe2..3cd792cd6 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/producer/StringRecord.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/producer/StringRecord.scala @@ -17,7 +17,7 @@ package hydra.kafka.producer import hydra.core.transport.AckStrategy -import org.apache.commons.lang3.StringUtils + /** * Created by alexsilva on 11/30/15. diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/transport/KafkaTransport.scala b/ingestors/kafka/src/main/scala/hydra/kafka/transport/KafkaTransport.scala index c8f9c7a46..9eea5b752 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/transport/KafkaTransport.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/transport/KafkaTransport.scala @@ -58,8 +58,11 @@ class KafkaTransport(producerSettings: Map[String, ProducerSettings[Any, Any]]) msgCounter.incrementAndGet() metrics.saveMetrics(kmd) - case e: RecordProduceError => - context.system.eventStream.publish(e) + case e: RecordProduceError => { + println("Received RecordProduceError: ") + println(e.error.getMessage) + context.system.eventStream.publish(e) + } case p: ProducerInitializationError => context.system.eventStream.publish(p) } diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/KafkaUtilsSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/KafkaUtilsSpec.scala index 138f22eb9..80fc2a069 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/KafkaUtilsSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/KafkaUtilsSpec.scala @@ -7,7 +7,7 @@ import hydra.common.config.KafkaConfigUtils.KafkaClientSecurityConfig import hydra.kafka.KafkaUtilsSpec.{emptyKafkaClientSecurityConfig, kafkaClientSecurityConfig} import hydra.kafka.util.KafkaUtils import hydra.kafka.util.KafkaUtils.TopicDetails -import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.scalatest.concurrent.{Eventually, ScalaFutures} import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/algebras/KafkaAdminAlgebraSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/algebras/KafkaAdminAlgebraSpec.scala index cecdd3359..33544adfe 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/algebras/KafkaAdminAlgebraSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/algebras/KafkaAdminAlgebraSpec.scala @@ -10,7 +10,7 @@ import hydra.kafka.algebras.KafkaClientAlgebra.getOptionalGenericRecordDeseriali import hydra.kafka.util.KafkaUtils.TopicDetails import org.typelevel.log4cats.SelfAwareStructuredLogger import org.typelevel.log4cats.slf4j.Slf4jLogger -import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.joda.time.DurationFieldType.seconds import org.scalatest.{BeforeAndAfterAll, stats} import org.scalatest.matchers.should.Matchers diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/algebras/KafkaClientAlgebraSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/algebras/KafkaClientAlgebraSpec.scala index e0c05dd47..54e18f53f 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/algebras/KafkaClientAlgebraSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/algebras/KafkaClientAlgebraSpec.scala @@ -2,7 +2,7 @@ package hydra.kafka.algebras import cats.effect.{Concurrent, ContextShift, IO, Timer} import hydra.avro.registry.SchemaRegistry -import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.avro.generic.GenericRecord import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/consumer/KafkaConsumerProxySpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/consumer/KafkaConsumerProxySpec.scala index c4f678d9d..cbf5947e7 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/consumer/KafkaConsumerProxySpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/consumer/KafkaConsumerProxySpec.scala @@ -18,7 +18,7 @@ package hydra.kafka.consumer import akka.actor.{ActorSystem, Props} import akka.testkit.{ImplicitSender, TestKit} import hydra.kafka.consumer.KafkaConsumerProxy._ -import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.kafka.common.TopicPartition import org.scalatest.matchers.should.Matchers import org.scalatest.funspec.AnyFunSpecLike @@ -37,7 +37,7 @@ class KafkaConsumerProxySpec with ImplicitSender { implicit val config = - EmbeddedKafkaConfig(kafkaPort = 8012, zooKeeperPort = 3111) + EmbeddedKafkaConfig(kafkaPort = 8012, zooKeeperPort = 3121) override def beforeAll() = { super.beforeAll() diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointSpec.scala index d09f7c54d..7f39b8d6b 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointSpec.scala @@ -50,7 +50,7 @@ class BootstrapEndpointSpec implicit val embeddedKafkaConfig = EmbeddedKafkaConfig( kafkaPort = 8012, - zooKeeperPort = 3111, + zooKeeperPort = 3011, customBrokerProperties = Map("auto.create.topics.enable" -> "false") ) diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/TopicMetadataEndpointSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/TopicMetadataEndpointSpec.scala index 9fdf9fb1d..1630252a0 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/TopicMetadataEndpointSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/TopicMetadataEndpointSpec.scala @@ -22,9 +22,11 @@ import hydra.kafka.consumer.KafkaConsumerProxy.{GetPartitionInfo, ListTopics, Li import hydra.kafka.marshallers.HydraKafkaJsonSupport import hydra.kafka.model.{DataClassification, ObsoleteDataClassification, RequiredField, SubDataClassification} import hydra.kafka.model.TopicMetadataV2Request.Subject +import org.typelevel.log4cats.SelfAwareStructuredLogger +import org.typelevel.log4cats.slf4j.Slf4jLogger +import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import hydra.kafka.programs.CreateTopicProgram import hydra.kafka.util.KafkaUtils.TopicDetails -import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder} import org.apache.kafka.common.{Node, PartitionInfo} import org.scalatest.BeforeAndAfterAll @@ -56,7 +58,7 @@ class TopicMetadataEndpointSpec Slf4jLogger.getLogger[F] implicit val kafkaConfig: EmbeddedKafkaConfig = - EmbeddedKafkaConfig(kafkaPort = 8012, zooKeeperPort = 3111) + EmbeddedKafkaConfig(kafkaPort = 8012, zooKeeperPort = 3789) implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) implicit val concurrent: Concurrent[IO] = IO.ioConcurrentEffect diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/services/TopicBootstrapActorSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/services/TopicBootstrapActorSpec.scala index 2a5f135e5..aff5ae0ef 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/services/TopicBootstrapActorSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/services/TopicBootstrapActorSpec.scala @@ -17,7 +17,7 @@ import hydra.kafka.model.TopicMetadata import hydra.kafka.producer.AvroRecord import hydra.kafka.services.StreamsManagerActor.{GetMetadata, GetMetadataResponse} import hydra.kafka.services.TopicBootstrapActor._ -import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig, KafkaUnavailableException} +import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.avro.Schema import org.apache.kafka.common.serialization.StringSerializer import org.joda.time.DateTime @@ -48,7 +48,7 @@ class TopicBootstrapActorSpec implicit val embeddedKafkaConfig = EmbeddedKafkaConfig( kafkaPort = 8012, - zooKeeperPort = 3111, + zooKeeperPort = 3241, customBrokerProperties = Map("auto.create.topics.enable" -> "false") ) diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/transport/KafkaMetricsSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/transport/KafkaMetricsSpec.scala index 4ff6f427f..c90b23346 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/transport/KafkaMetricsSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/transport/KafkaMetricsSpec.scala @@ -26,7 +26,7 @@ class KafkaMetricsSpec implicit val config = EmbeddedKafkaConfig( kafkaPort = 8012, - zooKeeperPort = 3111, + zooKeeperPort = 3114, customBrokerProperties = Map( "auto.create.topics.enable" -> "false", "offsets.topic.replication.factor" -> "1" diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/transport/KafkaProducerProxySpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/transport/KafkaProducerProxySpec.scala index 8b6ef56c3..186a6b562 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/transport/KafkaProducerProxySpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/transport/KafkaProducerProxySpec.scala @@ -10,7 +10,7 @@ import hydra.kafka.producer.{JsonRecord, KafkaRecordMetadata, StringRecord} import hydra.kafka.transport.KafkaProducerProxy.ProduceToKafka import hydra.kafka.transport.KafkaTransport.RecordProduceError import hydra.kafka.util.KafkaUtils -import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.{KafkaException, TopicPartition} import org.scalatest.BeforeAndAfterAll @@ -32,7 +32,7 @@ class KafkaProducerProxySpec implicit val config = EmbeddedKafkaConfig( kafkaPort = 8012, - zooKeeperPort = 3111, + zooKeeperPort = 3133, customBrokerProperties = Map("auto.create.topics.enable" -> "false") ) diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/transport/KafkaTransportSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/transport/KafkaTransportSpec.scala index 462706665..91ee5de54 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/transport/KafkaTransportSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/transport/KafkaTransportSpec.scala @@ -9,7 +9,7 @@ import hydra.core.transport.{AckStrategy, RecordMetadata, TransportCallback} import hydra.kafka.producer.{DeleteTombstoneRecord, JsonRecord, StringRecord} import hydra.kafka.transport.KafkaProducerProxy.ProducerInitializationError import hydra.kafka.transport.KafkaTransport.RecordProduceError -import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.kafka.common.KafkaException import org.apache.kafka.common.errors.SerializationException import org.scalatest.matchers.should.Matchers @@ -85,8 +85,11 @@ class KafkaTransportSpec it("forwards to the right proxy") { val ack: TransportCallback = - (d: Long, m: Option[RecordMetadata], e: Option[Throwable]) => - ingestor.ref ! "DONE" + (d: Long, m: Option[RecordMetadata], e: Option[Throwable]) => { + val msg = if(e.isDefined) e.get.getMessage else "DONE" + ingestor.ref ! msg + } + val rec = StringRecord("transport_test", "key", "payload", AckStrategy.NoAck) transport ! Deliver(rec, 1, ack) @@ -95,8 +98,11 @@ class KafkaTransportSpec it("handles delete records") { val ack: TransportCallback = - (d: Long, m: Option[RecordMetadata], e: Option[Throwable]) => - ingestor.ref ! "DONE" + (d: Long, m: Option[RecordMetadata], e: Option[Throwable]) => { + val msg = if(e.isDefined) e.get.getMessage else "DONE" + ingestor.ref ! msg + } + val rec = DeleteTombstoneRecord("transport_test", "key", AckStrategy.NoAck) transport ! Deliver(rec, 1, ack) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c859bd55b..eb7b55576 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -12,16 +12,16 @@ object Dependencies { val catsRetryVersion = "2.1.0" val catsVersion = "2.4.2" val cirisVersion = "1.2.1" - val confluentVersion = "5.4.2" + val confluentVersion = "6.2.1" val fs2KafkaVersion = "1.4.1" val jacksonCoreVersion = "2.10.4" val jacksonDatabindVersion = "2.10.4" - val jodaConvertVersion = "2.2.1" - val jodaTimeVersion = "2.10.9" - val kafkaVersion = "2.4.1" + val jodaConvertVersion = "2.2.3" + val jodaTimeVersion = "2.12.5" + val kafkaVersion = "2.8.2" val kamonPVersion = "2.1.10" val kamonVersion = "2.1.10" - val log4jVersion = "2.17.1" + val log4jVersion = "2.22.1" val refinedVersion = "0.9.20" val reflectionsVersion = "0.9.12" val scalaCacheVersion = "0.28.0" @@ -72,7 +72,8 @@ object Dependencies { val retry = "com.softwaremill.retry" %% "retry" % "0.3.3" - val embeddedKafka = "net.manub" %% "scalatest-embedded-kafka" % "2.0.0" % "test" + val embeddedKafka = + "io.github.embeddedkafka" %% "embedded-kafka" % "2.8.1" % "test" lazy val kamon = Seq( "io.kamon" %% "kamon-core" % kamonVersion, @@ -86,14 +87,25 @@ object Dependencies { embeddedKafka ) ++ kafkaClients - val confluent: Seq[ModuleID] = + val kafkaAvroSerializer: Seq[ModuleID] = Seq("io.confluent" % "kafka-avro-serializer" % confluentVersion).map( _.excludeAll( ExclusionRule(organization = "org.codehaus.jackson"), - ExclusionRule(organization = "com.fasterxml.jackson.core") + ExclusionRule(organization = "com.fasterxml.jackson.core"), + ExclusionRule(organization = "org.apache.kafka") ) ) + val kafkaSchemaRegistry: Seq[ModuleID] = Seq("io.confluent" % "kafka-schema-registry-client" % confluentVersion).map( + _.excludeAll( + ExclusionRule(organization = "org.scala-lang.modules"), + ExclusionRule(organization = "org.apache.kafka", "kafka-clients"), + ExclusionRule(organization = "com.fasterxml.jackson.module"), + ExclusionRule(organization = "org.scala-lang.modules"), + ExclusionRule(organization = "com.typesafe.scala-logging") + ) + ) + val awsMskIamAuth = Seq("software.amazon.msk" % "aws-msk-iam-auth" % "1.1.4") val awsSdk = Seq( @@ -171,12 +183,12 @@ object Dependencies { val scalaMock = "org.scalamock" %% "scalamock" % scalaMockVersion % module val junit = "junit" % "junit" % "4.13.1" % module - val embeddedKafka = - "io.github.embeddedkafka" %% "embedded-kafka-schema-registry" % "5.4.1" % module + val embeddedKafkaSchemaRegistry = + "io.github.embeddedkafka" %% "embedded-kafka-schema-registry" % confluentVersion % module val scalatestEmbeddedRedis = "com.github.sebruck" %% "scalatest-embedded-redis" % scalaTestEmbeddedRedisVersion % module - akkaTest ++ Seq(scalaTest, scalaMock, junit, scalatestEmbeddedRedis, embeddedKafka) + akkaTest ++ Seq(scalaTest, scalaMock, junit, scalatestEmbeddedRedis, embeddedKafkaSchemaRegistry) } } @@ -205,13 +217,13 @@ object Dependencies { akka ++ Seq(avro, ciris, refined, enumeratum) ++ cats ++ logging ++ joda ++ testDeps ++ kafkaClients ++ awsMskIamAuth ++ vulcan val avroDeps: Seq[ModuleID] = - baseDeps ++ confluent ++ jackson ++ guavacache ++ catsEffect ++ redisCache + baseDeps ++ kafkaAvroSerializer ++ jackson ++ guavacache ++ catsEffect ++ redisCache val coreDeps: Seq[ModuleID] = akka ++ baseDeps ++ Seq( reflections, retry - ) ++ guavacache ++ confluent ++ kamon ++ redisCache + ) ++ guavacache ++ kafkaAvroSerializer ++ kamon ++ redisCache val ingestDeps: Seq[ModuleID] = coreDeps ++ akkaHttpHal ++ Seq(embeddedKafka, sprayJson) @@ -219,8 +231,10 @@ object Dependencies { akkaKafkaStream, refined, sprayJson - ) ++ kafka ++ akkaHttpHal ++ fs2Kafka ++ integrationDeps + ) ++ kafka ++ akkaHttpHal ++ fs2Kafka ++ integrationDeps ++ kafkaSchemaRegistry val awsAuthDeps: Seq[ModuleID] = awsSdk + val kafkaSchemaRegistryDep = kafkaSchemaRegistry + }