diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala
index 84e82f199..b53ee6cd7 100644
--- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala
+++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala
@@ -21,12 +21,11 @@ import io.lenses.streamreactor.connect.aws.s3.config._
import io.lenses.streamreactor.connect.aws.s3.model.location.S3LocationValidator
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3SinkConfig
import io.lenses.streamreactor.connect.aws.s3.storage.S3FileMetadata
-import io.lenses.streamreactor.connect.cloud.common.utils.ITSampleSchemaAndData.firstUsers
-import io.lenses.streamreactor.connect.cloud.common.utils.ITSampleSchemaAndData.users
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.common.config.JsonFormatSelection
+import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
@@ -48,12 +47,15 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.LeftPadP
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.NoOpPaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategy
-import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer
+import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer
+import io.lenses.streamreactor.connect.cloud.common.utils.ITSampleSchemaAndData.firstUsers
+import io.lenses.streamreactor.connect.cloud.common.utils.ITSampleSchemaAndData.users
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.UsersSchemaDecimal
import org.apache.kafka.connect.data.Struct
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
+import scala.jdk.CollectionConverters._
class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest {
@@ -247,4 +249,92 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
s"""{"name":"sam","title":"mr","salary":100.430000000000000000}""",
)
}
+
+ "json sink" should "write single json record when the input is not best practice: Array of POJO" in {
+
+ val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
+ val config = S3SinkConfig(
+ S3ConnectionConfig(
+ None,
+ Some(s3Container.identity.identity),
+ Some(s3Container.identity.credential),
+ AuthMode.Credentials,
+ ),
+ bucketOptions = Seq(
+ CloudSinkBucketOptions(
+ TopicName.some,
+ bucketAndPrefix,
+ commitPolicy = CommitPolicy(Count(1)),
+ formatSelection = JsonFormatSelection,
+ keyNamer = new CloudKeyNamer(
+ JsonFormatSelection,
+ defaultPartitionSelection(Values),
+ new OffsetFileNamer(
+ identity[String],
+ JsonFormatSelection.extension,
+ ),
+ new PaddingService(Map[String, PaddingStrategy](
+ "partition" -> NoOpPaddingStrategy,
+ "offset" -> LeftPadPaddingStrategy(12, 0),
+ )),
+ ),
+ localStagingArea = LocalStagingArea(localRoot),
+ partitionSelection = defaultPartitionSelection(Values),
+ dataStorage = DataStorageSettings.disabled,
+ ), // JsonS3Format
+ ),
+ offsetSeekerOptions = OffsetSeekerOptions(5),
+ compressionCodec,
+ batchDelete = true,
+ )
+
+ val sink = writerManagerCreator.from(config)
+ val topic = Topic(TopicName)
+ val offset = Offset(1)
+ val listOfPojo: java.util.List[Pojo] = List(
+ new Pojo("sam", "mr", 100.43),
+ new Pojo("laura", "ms", 429.06),
+ ).asJava
+
+ sink.write(
+ TopicPartitionOffset(topic, 1, offset),
+ MessageDetail(NullSinkData(None),
+ ArraySinkData(listOfPojo, None),
+ Map.empty[String, SinkData],
+ None,
+ topic,
+ 1,
+ offset,
+ ),
+ )
+ sink.close()
+
+ listBucketPath(BucketName, "streamReactorBackups/myTopic/1/").size should be(1)
+
+ remoteFileAsString(BucketName, "streamReactorBackups/myTopic/1/1.json") should be(
+ """[{"name":"sam","title":"mr","salary":100.43},{"name":"laura","title":"ms","salary":429.06}]""",
+ )
+ }
+}
+
+//create a class with the following fields
+//.put("name", "sam").put("title", "mr").put("salary", 100.43),
+class Pojo {
+ private var name: String = _
+ private var title: String = _
+ private var salary: Double = _
+
+ def this(name: String, title: String, salary: Double) = {
+ this()
+ this.name = name
+ this.title = title
+ this.salary = salary
+ }
+
+ def getName: String = name
+ def setName(name: String): Unit = this.name = name
+ def getTitle: String = title
+ def setTitle(title: String): Unit = this.title = title
+ def getSalary: Double = salary
+ def setSalary(salary: Double): Unit = this.salary = salary
}
diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToJsonDataConverter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToJsonDataConverter.scala
index 289a464af..f08404824 100644
--- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToJsonDataConverter.scala
+++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToJsonDataConverter.scala
@@ -15,6 +15,8 @@
*/
package io.lenses.streamreactor.connect.cloud.common.sink.conversion
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.ByteArraySinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MapSinkData
@@ -23,21 +25,28 @@ import io.lenses.streamreactor.connect.cloud.common.formats.writer.PrimitiveSink
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.model.Topic
+import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.json.JsonConverter
import java.nio.ByteBuffer
-
+import scala.jdk.CollectionConverters._
object ToJsonDataConverter {
+ private val jacksonJson: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
+
def convertMessageValueToByteArray(converter: JsonConverter, topic: Topic, data: SinkData): Array[Byte] =
data match {
case data: PrimitiveSinkData => converter.fromConnectData(topic.value, data.schema().orNull, data.safeValue)
- case StructSinkData(structVal) => converter.fromConnectData(topic.value, data.schema().orNull, structVal)
- case MapSinkData(map, schema) => converter.fromConnectData(topic.value, schema.orNull, map)
- case ArraySinkData(array, schema) => converter.fromConnectData(topic.value, schema.orNull, array)
- case ByteArraySinkData(_, _) => throw new IllegalStateException("Cannot currently write byte array as json")
- case NullSinkData(schema) => converter.fromConnectData(topic.value, schema.orNull, null)
- case other => throw new IllegalStateException(s"Unknown SinkData type, ${other.getClass.getSimpleName}")
+ case StructSinkData(structVal) => converter.fromConnectData(topic.value, data.schema().orNull, structVal)
+ case MapSinkData(map, schema) => converter.fromConnectData(topic.value, schema.orNull, map)
+ case ArraySinkData(array, _) if isPojo(array) =>
+ val json = jacksonJson.writeValueAsString(array)
+ json.getBytes()
+ case ArraySinkData(array, schema) =>
+ converter.fromConnectData(topic.value, schema.orNull, array)
+ case ByteArraySinkData(_, _) => throw new IllegalStateException("Cannot currently write byte array as json")
+ case NullSinkData(schema) => converter.fromConnectData(topic.value, schema.orNull, null)
+ case other => throw new IllegalStateException(s"Unknown SinkData type, ${other.getClass.getSimpleName}")
}
def convert(data: SinkData): Any = data match {
@@ -45,4 +54,13 @@ object ToJsonDataConverter {
case ByteArraySinkData(bArray, _) => ByteBuffer.wrap(bArray)
case data => data.value
}
+
+ /**
+ * This is a workaround to help some of the customers who use Kafka Connect SMT ignoring the best practices
+ */
+ private def isPojo(array: java.util.List[_]) =
+ array.size() > 0 && array.asScala.exists {
+ case _: Struct => false
+ case _ => true
+ }
}
diff --git a/suppression.xml b/suppression.xml
index 03a8bac7f..865acc02e 100644
--- a/suppression.xml
+++ b/suppression.xml
@@ -73,4 +73,34 @@
^pkg:maven/com\.azure/azure\-identity@.*$
cpe:/a:microsoft:azure_cli
+
+
+
+
+ ^pkg:maven/joda\-time/joda\-time@.*$
+ CVE-2024-23080
+
+
+
+
+
+ ^pkg:maven/org\.threeten/threetenbp@.*$
+ CVE-2024-23081
+
+
+
+ ^pkg:maven/org\.threeten/threetenbp@.*$
+ CVE-2024-23082
+
\ No newline at end of file