From da04836e0a13ac28914d0f503a5b0c36e9e3f876 Mon Sep 17 00:00:00 2001 From: Stefan Bocutiu Date: Tue, 16 Apr 2024 21:51:13 +0100 Subject: [PATCH] LC-191 Workaround Kafka Connect SMTs returning POJO (#1155) * LC-191 Workaround for bad Kafka Connect SMT practices Some users have implemented SMTs that deviate from recommended best practices, returning Plain Old Java Objects (POJOs) instead of constructing Connect structs. Consequently, such SMTs encounter compatibility issues with the JsonConverter, which is designed to handle Connect data structures. To address this, the proposed changes implement a mechanism to detect the presence of POJOs within the pipeline. Upon detection, the system leverages the Jackson Json writer to marshal the payload into JSON format, ensuring compatibility with the JsonConverter. This code would only work for Json storage format and not for Avro/Parquet. We don't plan workarounds for those scenarios * Suppression of invalid CVE warning. (#1138) * Suppression of invalid CVE warning. * Further suppressions * Address the PR comments --------- Co-authored-by: stheppi Co-authored-by: David Sloan <33483659+davidsloan@users.noreply.github.com> --- .../aws/s3/sink/S3JsonWriterManagerTest.scala | 96 ++++++++++++++++++- .../sink/conversion/ToJsonDataConverter.scala | 32 +++++-- suppression.xml | 30 ++++++ 3 files changed, 148 insertions(+), 10 deletions(-) diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala index 84e82f199..b53ee6cd7 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala @@ -21,12 +21,11 @@ import io.lenses.streamreactor.connect.aws.s3.config._ import io.lenses.streamreactor.connect.aws.s3.model.location.S3LocationValidator import io.lenses.streamreactor.connect.aws.s3.sink.config.S3SinkConfig import io.lenses.streamreactor.connect.aws.s3.storage.S3FileMetadata -import io.lenses.streamreactor.connect.cloud.common.utils.ITSampleSchemaAndData.firstUsers -import io.lenses.streamreactor.connect.cloud.common.utils.ITSampleSchemaAndData.users import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings import io.lenses.streamreactor.connect.cloud.common.config.JsonFormatSelection +import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData @@ -48,12 +47,15 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.LeftPadP import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.NoOpPaddingStrategy import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingService import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategy -import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer +import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer +import io.lenses.streamreactor.connect.cloud.common.utils.ITSampleSchemaAndData.firstUsers +import io.lenses.streamreactor.connect.cloud.common.utils.ITSampleSchemaAndData.users import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.UsersSchemaDecimal import org.apache.kafka.connect.data.Struct import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import scala.jdk.CollectionConverters._ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest { @@ -247,4 +249,92 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont s"""{"name":"sam","title":"mr","salary":100.430000000000000000}""", ) } + + "json sink" should "write single json record when the input is not best practice: Array of POJO" in { + + val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some) + val config = S3SinkConfig( + S3ConnectionConfig( + None, + Some(s3Container.identity.identity), + Some(s3Container.identity.credential), + AuthMode.Credentials, + ), + bucketOptions = Seq( + CloudSinkBucketOptions( + TopicName.some, + bucketAndPrefix, + commitPolicy = CommitPolicy(Count(1)), + formatSelection = JsonFormatSelection, + keyNamer = new CloudKeyNamer( + JsonFormatSelection, + defaultPartitionSelection(Values), + new OffsetFileNamer( + identity[String], + JsonFormatSelection.extension, + ), + new PaddingService(Map[String, PaddingStrategy]( + "partition" -> NoOpPaddingStrategy, + "offset" -> LeftPadPaddingStrategy(12, 0), + )), + ), + localStagingArea = LocalStagingArea(localRoot), + partitionSelection = defaultPartitionSelection(Values), + dataStorage = DataStorageSettings.disabled, + ), // JsonS3Format + ), + offsetSeekerOptions = OffsetSeekerOptions(5), + compressionCodec, + batchDelete = true, + ) + + val sink = writerManagerCreator.from(config) + val topic = Topic(TopicName) + val offset = Offset(1) + val listOfPojo: java.util.List[Pojo] = List( + new Pojo("sam", "mr", 100.43), + new Pojo("laura", "ms", 429.06), + ).asJava + + sink.write( + TopicPartitionOffset(topic, 1, offset), + MessageDetail(NullSinkData(None), + ArraySinkData(listOfPojo, None), + Map.empty[String, SinkData], + None, + topic, + 1, + offset, + ), + ) + sink.close() + + listBucketPath(BucketName, "streamReactorBackups/myTopic/1/").size should be(1) + + remoteFileAsString(BucketName, "streamReactorBackups/myTopic/1/1.json") should be( + """[{"name":"sam","title":"mr","salary":100.43},{"name":"laura","title":"ms","salary":429.06}]""", + ) + } +} + +//create a class with the following fields +//.put("name", "sam").put("title", "mr").put("salary", 100.43), +class Pojo { + private var name: String = _ + private var title: String = _ + private var salary: Double = _ + + def this(name: String, title: String, salary: Double) = { + this() + this.name = name + this.title = title + this.salary = salary + } + + def getName: String = name + def setName(name: String): Unit = this.name = name + def getTitle: String = title + def setTitle(title: String): Unit = this.title = title + def getSalary: Double = salary + def setSalary(salary: Double): Unit = this.salary = salary } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToJsonDataConverter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToJsonDataConverter.scala index 289a464af..f08404824 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToJsonDataConverter.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToJsonDataConverter.scala @@ -15,6 +15,8 @@ */ package io.lenses.streamreactor.connect.cloud.common.sink.conversion +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData import io.lenses.streamreactor.connect.cloud.common.formats.writer.ByteArraySinkData import io.lenses.streamreactor.connect.cloud.common.formats.writer.MapSinkData @@ -23,21 +25,28 @@ import io.lenses.streamreactor.connect.cloud.common.formats.writer.PrimitiveSink import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData import io.lenses.streamreactor.connect.cloud.common.model.Topic +import org.apache.kafka.connect.data.Struct import org.apache.kafka.connect.json.JsonConverter import java.nio.ByteBuffer - +import scala.jdk.CollectionConverters._ object ToJsonDataConverter { + private val jacksonJson: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule) + def convertMessageValueToByteArray(converter: JsonConverter, topic: Topic, data: SinkData): Array[Byte] = data match { case data: PrimitiveSinkData => converter.fromConnectData(topic.value, data.schema().orNull, data.safeValue) - case StructSinkData(structVal) => converter.fromConnectData(topic.value, data.schema().orNull, structVal) - case MapSinkData(map, schema) => converter.fromConnectData(topic.value, schema.orNull, map) - case ArraySinkData(array, schema) => converter.fromConnectData(topic.value, schema.orNull, array) - case ByteArraySinkData(_, _) => throw new IllegalStateException("Cannot currently write byte array as json") - case NullSinkData(schema) => converter.fromConnectData(topic.value, schema.orNull, null) - case other => throw new IllegalStateException(s"Unknown SinkData type, ${other.getClass.getSimpleName}") + case StructSinkData(structVal) => converter.fromConnectData(topic.value, data.schema().orNull, structVal) + case MapSinkData(map, schema) => converter.fromConnectData(topic.value, schema.orNull, map) + case ArraySinkData(array, _) if isPojo(array) => + val json = jacksonJson.writeValueAsString(array) + json.getBytes() + case ArraySinkData(array, schema) => + converter.fromConnectData(topic.value, schema.orNull, array) + case ByteArraySinkData(_, _) => throw new IllegalStateException("Cannot currently write byte array as json") + case NullSinkData(schema) => converter.fromConnectData(topic.value, schema.orNull, null) + case other => throw new IllegalStateException(s"Unknown SinkData type, ${other.getClass.getSimpleName}") } def convert(data: SinkData): Any = data match { @@ -45,4 +54,13 @@ object ToJsonDataConverter { case ByteArraySinkData(bArray, _) => ByteBuffer.wrap(bArray) case data => data.value } + + /** + * This is a workaround to help some of the customers who use Kafka Connect SMT ignoring the best practices + */ + private def isPojo(array: java.util.List[_]) = + array.size() > 0 && array.asScala.exists { + case _: Struct => false + case _ => true + } } diff --git a/suppression.xml b/suppression.xml index 03a8bac7f..865acc02e 100644 --- a/suppression.xml +++ b/suppression.xml @@ -73,4 +73,34 @@ ^pkg:maven/com\.azure/azure\-identity@.*$ cpe:/a:microsoft:azure_cli + + + + + ^pkg:maven/joda\-time/joda\-time@.*$ + CVE-2024-23080 + + + + + + ^pkg:maven/org\.threeten/threetenbp@.*$ + CVE-2024-23081 + + + + ^pkg:maven/org\.threeten/threetenbp@.*$ + CVE-2024-23082 + \ No newline at end of file