diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt index 0cb3496a5221..64aaef992a62 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt @@ -59,7 +59,15 @@ class AvroRecordToAirbyteValue { throw UnsupportedOperationException("ObjectTypeWithEmptySchema is not supported") is ObjectTypeWithoutSchema -> throw UnsupportedOperationException("ObjectTypeWithoutSchema is not supported") - is StringType -> return StringValue((avroValue as Utf8).toString()) + is StringType -> + return StringValue( + when (avroValue) { + is Utf8 -> avroValue.toString() // Avro + is String -> avroValue // Avro via Parquet + else -> + throw IllegalArgumentException("Unsupported string type: $avroValue") + } + ) is TimeType -> throw UnsupportedOperationException("TimeType is not supported") is TimestampType -> throw UnsupportedOperationException("TimestampType is not supported") diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle b/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle index 5368090f6ad2..8f3e488dd24b 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle @@ -1,7 +1,12 @@ dependencies { implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base') implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load') + + // TODO: Separate individual format types from the core format spec + // and migrate them to their respective toolkits, so that these + // dependencies can be removed. api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-avro') + api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-parquet') api("org.apache.commons:commons-csv:1.10.0") diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageFormatSpecification.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageFormatSpecification.kt index f8e57af85562..4b13cb0fb997 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageFormatSpecification.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageFormatSpecification.kt @@ -55,8 +55,8 @@ interface ObjectStorageFormatSpecificationProvider { @JsonSubTypes( JsonSubTypes.Type(value = JsonFormatSpecification::class, name = "JSONL"), JsonSubTypes.Type(value = CSVFormatSpecification::class, name = "CSV"), - JsonSubTypes.Type(value = AvroFormatSpecification::class, name = "AVRO"), - JsonSubTypes.Type(value = ParquetFormatSpecification::class, name = "PARQUET") + JsonSubTypes.Type(value = AvroFormatSpecification::class, name = "Avro"), + JsonSubTypes.Type(value = ParquetFormatSpecification::class, name = "Parquet") ) sealed class ObjectStorageFormatSpecification( @JsonSchemaTitle("Format Type") @JsonProperty("format_type") open val formatType: Type @@ -64,8 +64,8 @@ sealed class ObjectStorageFormatSpecification( enum class Type(@get:JsonValue val typeName: String) { JSONL("JSONL"), CSV("CSV"), - AVRO("AVRO"), - PARQUET("PARQUET") + AVRO("Avro"), + PARQUET("Parquet") } } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt index bc5e43c6d847..f03333dc60d5 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfiguration import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfiguration +import io.airbyte.cdk.load.command.object_storage.ParquetFormatConfiguration import io.airbyte.cdk.load.data.avro.toAirbyteValue import io.airbyte.cdk.load.data.avro.toAvroSchema import io.airbyte.cdk.load.data.toAirbyteValue @@ -19,6 +20,7 @@ import io.airbyte.cdk.load.file.avro.toAvroReader import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory import io.airbyte.cdk.load.file.object_storage.RemoteObject +import io.airbyte.cdk.load.file.parquet.toParquetReader import io.airbyte.cdk.load.test.util.OutputRecord import io.airbyte.cdk.load.test.util.toOutputRecord import io.airbyte.cdk.load.util.deserializeToNode @@ -95,6 +97,15 @@ class ObjectStorageDataDumper( .toList() } } - else -> error("Unsupported format") + is ParquetFormatConfiguration -> { + inputStream + .toParquetReader(stream.schemaWithMeta.toAvroSchema(stream.descriptor)) + .use { reader -> + reader + .recordSequence() + .map { it.toAirbyteValue(stream.schemaWithMeta).toOutputRecord() } + .toList() + } + } } } diff --git a/airbyte-cdk/bulk/toolkits/load-parquet/build.gradle b/airbyte-cdk/bulk/toolkits/load-parquet/build.gradle new file mode 100644 index 000000000000..51fdebd3e384 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-parquet/build.gradle @@ -0,0 +1,17 @@ +dependencies { + implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base') + implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load') + api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-avro') + + api ('org.apache.hadoop:hadoop-common:3.4.0') { + exclude group: 'org.apache.zookeeper' + exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-common' + } + api ('org.apache.hadoop:hadoop-mapreduce-client-core:3.4.0') { + exclude group: 'org.apache.zookeeper' + exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-common' + } + api 'org.apache.parquet:parquet-avro:1.14.2' + + testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:core:bulk-cdk-core-load")) +} diff --git a/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/file/parquet/ParquetReader.kt b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/file/parquet/ParquetReader.kt new file mode 100644 index 000000000000..384a2753bad2 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/file/parquet/ParquetReader.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.file.parquet + +import java.io.Closeable +import java.io.File +import java.io.InputStream +import kotlin.io.path.outputStream +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader +import org.apache.parquet.avro.AvroReadSupport +import org.apache.parquet.hadoop.ParquetReader as ApacheParquetReader + +class ParquetReader( + private val reader: ApacheParquetReader, + private val tmpFile: File +) : Closeable { + private fun read(): GenericRecord? { + return reader.read() + } + + fun recordSequence(): Sequence = generateSequence { read() } + + override fun close() { + reader.close() + tmpFile.delete() + } +} + +fun InputStream.toParquetReader(avroSchema: Schema): ParquetReader { + + val tmpFile = + kotlin.io.path.createTempFile( + prefix = "${avroSchema.namespace}.${avroSchema.name}", + suffix = ".avro" + ) + tmpFile.outputStream().use { outputStream -> this.copyTo(outputStream) } + val reader = + AvroParquetReader.builder( + AvroReadSupport(), + Path(tmpFile.toAbsolutePath().toString()) + ) + .build() + + return ParquetReader(reader, tmpFile.toFile()) +} diff --git a/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/file/parquet/ParquetWriter.kt b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/file/parquet/ParquetWriter.kt new file mode 100644 index 000000000000..4bd915ed5a70 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/file/parquet/ParquetWriter.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.file.parquet + +import java.io.Closeable +import java.io.OutputStream +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter as ApacheParquetWriter +import org.apache.parquet.io.OutputFile +import org.apache.parquet.io.PositionOutputStream + +class ParquetWriter(private val writer: ApacheParquetWriter) : Closeable { + fun write(record: GenericRecord) = writer.write(record) + override fun close() = writer.close() +} + +fun OutputStream.toParquetWriter(avroSchema: Schema): ParquetWriter { + // Custom OutputFile implementation wrapping the OutputStream + val outputFile = + object : OutputFile { + var position: Long = 0 + override fun create(blockSizeHint: Long) = + object : PositionOutputStream() { + override fun write(b: Int) { + position += 1 + this@toParquetWriter.write(b) + } + override fun write(bytes: ByteArray, off: Int, len: Int) { + position += len + this@toParquetWriter.write(bytes, off, len) + } + override fun flush() = this@toParquetWriter.flush() + override fun close() = this@toParquetWriter.close() + override fun getPos() = position + } + + override fun createOrOverwrite(blockSizeHint: Long) = create(blockSizeHint) + override fun supportsBlockSize() = false + override fun defaultBlockSize() = 0L + } + + // Initialize AvroParquetWriter with the custom OutputFile + val writer = + AvroParquetWriter.builder(outputFile) + .withSchema(avroSchema) + .withConf(Configuration()) + .build() + + return ParquetWriter(writer) +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index f221c4db3a43..c80b1d54008a 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.1.7 + dockerImageTag: 0.1.8 dockerRepository: airbyte/destination-s3-v2 githubIssueLabel: destination-s3-v2 icon: s3.svg @@ -51,4 +51,9 @@ data: secretStore: type: GSM alias: airbyte-connector-testing-secret-store + - name: SECRET_DESTINATION-S3-V2-PARQUET + fileName: s3_dest_v2_parquet_config.json + secretStore: + type: GSM + alias: airbyte-connector-testing-secret-store metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt index 27cf9c152c41..baff49ee5cf3 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt @@ -5,16 +5,12 @@ package io.airbyte.integrations.destination.s3_v2 import io.airbyte.cdk.load.check.DestinationChecker -import io.airbyte.cdk.load.command.object_storage.AvroFormatConfiguration -import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration -import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration import io.airbyte.cdk.load.file.TimeProvider import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory import io.airbyte.cdk.load.file.s3.S3ClientFactory import io.airbyte.cdk.load.file.s3.S3Object import io.airbyte.cdk.load.util.write import io.github.oshai.kotlinlogging.KotlinLogging -import io.micronaut.context.exceptions.ConfigurationException import jakarta.inject.Singleton import java.io.OutputStream import kotlinx.coroutines.flow.toList @@ -27,15 +23,6 @@ class S3V2Checker(private val timeProvider: TimeProvider) : override fun check(config: S3V2Configuration) { runBlocking { - if ( - config.objectStorageFormatConfiguration !is JsonFormatConfiguration && - config.objectStorageFormatConfiguration !is CSVFormatConfiguration && - config.objectStorageFormatConfiguration !is AvroFormatConfiguration - ) { - throw ConfigurationException( - "Currently only JSON, CSV, and Avro formats are supported" - ) - } val s3Client = S3ClientFactory.make(config) val pathFactory = ObjectStoragePathFactory.from(config, timeProvider) val path = pathFactory.getStagingDirectory(mockStream()) diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt index 8bcd14ebb1ed..7051698225b7 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.command.object_storage.AvroFormatConfiguration import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider +import io.airbyte.cdk.load.command.object_storage.ParquetFormatConfiguration import io.airbyte.cdk.load.data.DestinationRecordToAirbyteValueWithMeta import io.airbyte.cdk.load.data.avro.toAvroRecord import io.airbyte.cdk.load.data.avro.toAvroSchema @@ -18,6 +19,7 @@ import io.airbyte.cdk.load.data.toCsvRecord import io.airbyte.cdk.load.data.toJson import io.airbyte.cdk.load.file.avro.toAvroWriter import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory +import io.airbyte.cdk.load.file.parquet.toParquetWriter import io.airbyte.cdk.load.file.s3.S3Client import io.airbyte.cdk.load.file.s3.S3Object import io.airbyte.cdk.load.message.Batch @@ -55,7 +57,11 @@ class S3V2Writer( inner class S3V2StreamLoader(override val stream: DestinationStream) : StreamLoader { private val partNumber = AtomicLong(0L) // TODO: Get from destination state private val avroSchema = - if (formatConfig.objectStorageFormatConfiguration is AvroFormatConfiguration) { + if ( + formatConfig.objectStorageFormatConfiguration.let { + it is AvroFormatConfiguration || it is ParquetFormatConfiguration + } + ) { stream.schemaWithMeta.toAvroSchema(stream.descriptor) } else { null @@ -98,6 +104,15 @@ class S3V2Writer( } } } + is ParquetFormatConfiguration -> { + outputStream.toParquetWriter(avroSchema!!).use { writer -> + records.forEach { + writer.write( + recordDecorator.decorate(it).toAvroRecord(avroSchema) + ) + } + } + } else -> throw IllegalStateException("Unsupported format") } } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt index 758205215e36..c1d7be8f6dd9 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt @@ -14,6 +14,7 @@ object S3V2TestUtils { const val CSV_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_csv_config.json" const val CSV_GZIP_CONFIG_PATH = "secrets/s3_dest_v2_csv_gzip_config.json" const val AVRO_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_avro_config.json" + const val PARQUET_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_parquet_config.json" fun getConfig(configPath: String): S3V2Specification = ValidatedJsonUtils.parseOne( S3V2Specification::class.java, diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 89188270ef54..c78d8c2c25f8 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -39,3 +39,5 @@ class S3V2WriteTestCsvUncompressed : S3V2WriteTest(S3V2TestUtils.CSV_UNCOMPRESSE class S3V2WriteTestCsvGzip : S3V2WriteTest(S3V2TestUtils.CSV_GZIP_CONFIG_PATH) class S3V2WriteTestAvro : S3V2WriteTest(S3V2TestUtils.AVRO_UNCOMPRESSED_CONFIG_PATH) + +class S3V2WriteTestParquet : S3V2WriteTest(S3V2TestUtils.PARQUET_UNCOMPRESSED_CONFIG_PATH) diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json index f713da3d7210..f2360df4fb0a 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json @@ -129,8 +129,8 @@ "properties" : { "format_type" : { "type" : "string", - "enum" : [ "AVRO" ], - "default" : "AVRO" + "enum" : [ "Avro" ], + "default" : "Avro" } }, "required" : [ "format_type" ] @@ -141,8 +141,8 @@ "properties" : { "format_type" : { "type" : "string", - "enum" : [ "PARQUET" ], - "default" : "PARQUET" + "enum" : [ "Parquet" ], + "default" : "Parquet" } }, "required" : [ "format_type" ] diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json index f713da3d7210..f2360df4fb0a 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json @@ -129,8 +129,8 @@ "properties" : { "format_type" : { "type" : "string", - "enum" : [ "AVRO" ], - "default" : "AVRO" + "enum" : [ "Avro" ], + "default" : "Avro" } }, "required" : [ "format_type" ] @@ -141,8 +141,8 @@ "properties" : { "format_type" : { "type" : "string", - "enum" : [ "PARQUET" ], - "default" : "PARQUET" + "enum" : [ "Parquet" ], + "default" : "Parquet" } }, "required" : [ "format_type" ]