From 2469b7309b8402208fd5a875be783e67ba870791 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Mon, 21 Oct 2024 13:08:17 -0700 Subject: [PATCH] Bulk Load CDK: Avro toolkit and S3V2 usage (#47121) --- .../cdk/load/command/DestinationStream.kt | 2 +- .../bulk/toolkits/load-avro/build.gradle | 8 ++ .../load/data/avro/AirbyteTypeToAvroSchema.kt | 74 ++++++++++++++++ .../data/avro/AirbyteValueToAvroRecord.kt | 58 +++++++++++++ .../data/avro/AvroRecordToAirbyteValue.kt | 86 +++++++++++++++++++ .../airbyte/cdk/load/file/avro/AvroReader.kt | 48 +++++++++++ .../airbyte/cdk/load/file/avro/AvroWriter.kt | 35 ++++++++ .../toolkits/load-object-storage/build.gradle | 1 + .../cdk/load/data/AirbyteTypeToCsvHeader.kt | 11 +-- .../cdk/load/data/AirbyteValueToCsvRow.kt | 7 +- .../cdk/load/ObjectStorageDataDumper.kt | 28 ++++-- .../cdk/load/file/s3/S3MultipartUpload.kt | 4 - .../destination-s3-v2/metadata.yaml | 7 +- .../src/main/kotlin/S3V2Checker.kt | 8 +- .../src/main/kotlin/S3V2Writer.kt | 19 ++++ .../destination/s3_v2/S3V2TestUtils.kt | 1 + .../destination/s3_v2/S3V2WriteTest.kt | 2 + 17 files changed, 372 insertions(+), 27 deletions(-) create mode 100644 airbyte-cdk/bulk/toolkits/load-avro/build.gradle create mode 100644 airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteValueToAvroRecord.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/file/avro/AvroReader.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/file/avro/AvroWriter.kt diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationStream.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationStream.kt index 71587934bbc4..2a1d5a5097eb 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationStream.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationStream.kt @@ -51,7 +51,7 @@ data class DestinationStream( * what actually exists, as many destinations have legacy data from before this schema was * adopted. */ - val schemaWithMeta: AirbyteType + val schemaWithMeta: ObjectType get() = ObjectType( linkedMapOf( diff --git a/airbyte-cdk/bulk/toolkits/load-avro/build.gradle b/airbyte-cdk/bulk/toolkits/load-avro/build.gradle new file mode 100644 index 000000000000..871fb32381a7 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-avro/build.gradle @@ -0,0 +1,8 @@ +dependencies { + implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base') + implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load') + + api("org.apache.avro:avro:1.11.0") + + testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:core:bulk-cdk-core-load")) +} diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt new file mode 100644 index 000000000000..c2b3eddfc6f7 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data.avro + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.data.AirbyteType +import io.airbyte.cdk.load.data.ArrayType +import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema +import io.airbyte.cdk.load.data.BooleanType +import io.airbyte.cdk.load.data.DateType +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.NullType +import io.airbyte.cdk.load.data.NumberType +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.TimeType +import io.airbyte.cdk.load.data.TimestampType +import io.airbyte.cdk.load.data.UnionType +import io.airbyte.cdk.load.data.UnknownType +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder + +class AirbyteTypeToAvroSchema { + fun convert(airbyteSchema: AirbyteType, path: List): Schema { + when (airbyteSchema) { + is ObjectType -> { + val name = path.last() + val namespace = path.take(path.size - 1).reversed().joinToString(".") + val builder = SchemaBuilder.record(name).namespace(namespace).fields() + return airbyteSchema.properties.entries + .fold(builder) { acc, (name, field) -> + // NOTE: We will not support nullable at this stage. + // All nullables should have been converted to union[this, null] upstream + // TODO: Enforce this + acc.name(name).type(convert(field.type, path + name)).noDefault() + } + .endRecord() + } + is ArrayType -> { + return SchemaBuilder.array() + .items(convert(airbyteSchema.items.type, path + "items")) + } + is ArrayTypeWithoutSchema -> + throw IllegalArgumentException("Array type without schema is not supported") + is BooleanType -> return SchemaBuilder.builder().booleanType() + is DateType -> + throw IllegalArgumentException("String-based date types are not supported") + is IntegerType -> return SchemaBuilder.builder().longType() + is NullType -> return SchemaBuilder.builder().nullType() + is NumberType -> return SchemaBuilder.builder().doubleType() + is ObjectTypeWithEmptySchema -> + throw IllegalArgumentException("Object type with empty schema is not supported") + is ObjectTypeWithoutSchema -> + throw IllegalArgumentException("Object type without schema is not supported") + is StringType -> return SchemaBuilder.builder().stringType() + is TimeType -> + throw IllegalArgumentException("String-based time types are not supported") + is TimestampType -> + throw IllegalArgumentException("String-based timestamp types are not supported") + is UnionType -> + return Schema.createUnion(airbyteSchema.options.map { convert(it, path) }) + is UnknownType -> throw IllegalArgumentException("Unknown type: ${airbyteSchema.what}") + } + } +} + +fun ObjectType.toAvroSchema(stream: DestinationStream.Descriptor): Schema { + val path = listOf(stream.namespace ?: "default", stream.name) + return AirbyteTypeToAvroSchema().convert(this, path) +} diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteValueToAvroRecord.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteValueToAvroRecord.kt new file mode 100644 index 000000000000..d1aac913a4e2 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteValueToAvroRecord.kt @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data.avro + +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.ArrayValue +import io.airbyte.cdk.load.data.BooleanValue +import io.airbyte.cdk.load.data.DateValue +import io.airbyte.cdk.load.data.IntegerValue +import io.airbyte.cdk.load.data.NullValue +import io.airbyte.cdk.load.data.NumberValue +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimeValue +import io.airbyte.cdk.load.data.TimestampValue +import io.airbyte.cdk.load.data.UnknownValue +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecord + +class AirbyteValueToAvroRecord { + fun convert(airbyteValue: AirbyteValue, schema: Schema): Any? { + when (airbyteValue) { + is ObjectValue -> { + val record = GenericData.Record(schema) + airbyteValue.values.forEach { (name, value) -> + record.put(name, convert(value, schema.getField(name).schema())) + } + return record + } + is ArrayValue -> { + val array = GenericData.Array(airbyteValue.values.size, schema) + airbyteValue.values.forEach { value -> + array.add(convert(value, schema.elementType)) + } + return array + } + is BooleanValue -> return airbyteValue.value + is DateValue -> + throw IllegalArgumentException("String-based date types are not supported") + is IntegerValue -> return airbyteValue.value + NullValue -> return null + is NumberValue -> return airbyteValue.value.toDouble() + is StringValue -> return airbyteValue.value + is TimeValue -> + throw IllegalArgumentException("String-based time types are not supported") + is TimestampValue -> + throw IllegalArgumentException("String-based timestamp types are not supported") + is UnknownValue -> throw IllegalArgumentException("Unknown type is not supported") + } + } +} + +fun ObjectValue.toAvroRecord(schema: Schema): GenericRecord { + return AirbyteValueToAvroRecord().convert(this, schema) as GenericRecord +} diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt new file mode 100644 index 000000000000..0cb3496a5221 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data.avro + +import io.airbyte.cdk.load.data.AirbyteType +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.ArrayType +import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema +import io.airbyte.cdk.load.data.ArrayValue +import io.airbyte.cdk.load.data.BooleanType +import io.airbyte.cdk.load.data.BooleanValue +import io.airbyte.cdk.load.data.DateType +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.IntegerValue +import io.airbyte.cdk.load.data.NullType +import io.airbyte.cdk.load.data.NullValue +import io.airbyte.cdk.load.data.NumberType +import io.airbyte.cdk.load.data.NumberValue +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimeType +import io.airbyte.cdk.load.data.TimestampType +import io.airbyte.cdk.load.data.UnionType +import io.airbyte.cdk.load.data.UnknownType +import org.apache.avro.generic.GenericArray +import org.apache.avro.generic.GenericRecord +import org.apache.avro.util.Utf8 + +class AvroRecordToAirbyteValue { + fun convert(avroValue: Any?, schema: AirbyteType): AirbyteValue { + when (schema) { + is ObjectType -> { + val properties = LinkedHashMap() + schema.properties.forEach { (name, field) -> + val value = (avroValue as GenericRecord).get(name) + properties[name] = convert(value, field.type) + } + return ObjectValue(properties) + } + is ArrayType -> { + val items = schema.items + val values = (avroValue as GenericArray<*>).map { convert(it, items.type) } + return ArrayValue(values) + } + is ArrayTypeWithoutSchema -> + throw UnsupportedOperationException("ArrayTypeWithoutSchema is not supported") + is BooleanType -> return BooleanValue(avroValue as Boolean) + is DateType -> throw UnsupportedOperationException("DateType is not supported") + is IntegerType -> return IntegerValue(avroValue as Long) + is NullType -> return NullValue + is NumberType -> return NumberValue((avroValue as Double).toBigDecimal()) + is ObjectTypeWithEmptySchema -> + throw UnsupportedOperationException("ObjectTypeWithEmptySchema is not supported") + is ObjectTypeWithoutSchema -> + throw UnsupportedOperationException("ObjectTypeWithoutSchema is not supported") + is StringType -> return StringValue((avroValue as Utf8).toString()) + is TimeType -> throw UnsupportedOperationException("TimeType is not supported") + is TimestampType -> + throw UnsupportedOperationException("TimestampType is not supported") + is UnionType -> return tryConvertUnion(avroValue, schema) + is UnknownType -> throw UnsupportedOperationException("UnknownType is not supported") + else -> throw IllegalArgumentException("Unsupported schema type: $schema") + } + } + + private fun tryConvertUnion(avroValue: Any?, schema: UnionType): AirbyteValue { + for (type in schema.options) { + try { + return convert(avroValue, type) + } catch (e: Exception) { + continue + } + } + throw IllegalArgumentException("Could not convert value to any of the union types") + } +} + +fun GenericRecord.toAirbyteValue(schema: AirbyteType): AirbyteValue { + return AvroRecordToAirbyteValue().convert(this, schema) +} diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/file/avro/AvroReader.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/file/avro/AvroReader.kt new file mode 100644 index 000000000000..22f335b0abb3 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/file/avro/AvroReader.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.file.avro + +import java.io.Closeable +import java.io.InputStream +import kotlin.io.path.outputStream +import org.apache.avro.Schema +import org.apache.avro.file.DataFileReader +import org.apache.avro.generic.GenericDatumReader +import org.apache.avro.generic.GenericRecord + +class AvroReader( + private val dataFileReader: DataFileReader, + private val tmpFile: java.io.File +) : Closeable { + private fun read(): GenericRecord? { + return if (dataFileReader.hasNext()) { + dataFileReader.next() + } else { + null + } + } + + fun recordSequence(): Sequence { + return generateSequence { read() } + } + + override fun close() { + dataFileReader.close() + tmpFile.delete() + } +} + +fun InputStream.toAvroReader(avroSchema: Schema): AvroReader { + val reader = GenericDatumReader(avroSchema) + val tmpFile = + kotlin.io.path.createTempFile( + prefix = "${avroSchema.namespace}.${avroSchema.name}", + suffix = ".avro" + ) + tmpFile.outputStream().use { outputStream -> this.copyTo(outputStream) } + val file = tmpFile.toFile() + val dataFileReader = DataFileReader(file, reader) + return AvroReader(dataFileReader, file) +} diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/file/avro/AvroWriter.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/file/avro/AvroWriter.kt new file mode 100644 index 000000000000..30b197ef02a7 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/file/avro/AvroWriter.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.file.avro + +import java.io.Closeable +import java.io.OutputStream +import org.apache.avro.Schema +import org.apache.avro.file.DataFileWriter +import org.apache.avro.generic.GenericDatumWriter +import org.apache.avro.generic.GenericRecord + +class AvroWriter( + private val dataFileWriter: DataFileWriter, +) : Closeable { + fun write(record: GenericRecord) { + dataFileWriter.append(record) + } + + fun flush() { + dataFileWriter.flush() + } + + override fun close() { + dataFileWriter.close() + } +} + +fun OutputStream.toAvroWriter(avroSchema: Schema): AvroWriter { + val datumWriter = GenericDatumWriter(avroSchema) + val dataFileWriter = DataFileWriter(datumWriter) + dataFileWriter.create(avroSchema, this) + return AvroWriter(dataFileWriter) +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle b/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle index e9a2f683d569..5368090f6ad2 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle @@ -1,6 +1,7 @@ dependencies { implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base') implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load') + api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-avro') api("org.apache.commons:commons-csv:1.10.0") diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToCsvHeader.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToCsvHeader.kt index 6edb3a6af91c..ff388deeaba8 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToCsvHeader.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToCsvHeader.kt @@ -9,17 +9,14 @@ import org.apache.commons.csv.CSVFormat import org.apache.commons.csv.CSVPrinter class AirbyteTypeToCsvHeader { - fun convert(schema: AirbyteType): Array { - if (schema !is ObjectType) { - throw IllegalArgumentException("Only object types are supported") - } + fun convert(schema: ObjectType): Array { return schema.properties.map { it.key }.toTypedArray() } } -fun AirbyteType.toCsvHeader(): Array { +fun ObjectType.toCsvHeader(): Array { return AirbyteTypeToCsvHeader().convert(this) } -fun AirbyteType.toCsvPrinterWithHeader(writer: Writer): CSVPrinter = - CSVFormat.Builder.create().setHeader(*toCsvHeader()).build().print(writer) +fun ObjectType.toCsvPrinterWithHeader(writer: Writer): CSVPrinter = + CSVFormat.Builder.create().setHeader(*toCsvHeader()).setAutoFlush(true).build().print(writer) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueToCsvRow.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueToCsvRow.kt index cdac400e4b3b..5da20eef3ada 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueToCsvRow.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueToCsvRow.kt @@ -7,10 +7,7 @@ package io.airbyte.cdk.load.data import io.airbyte.cdk.load.util.serializeToString class AirbyteValueToCsvRow { - fun convert(value: AirbyteValue): Array { - if (value !is ObjectValue) { - throw IllegalArgumentException("Only object values are supported") - } + fun convert(value: ObjectValue): Array { return value.values.map { convertInner(it.value) }.toTypedArray() } @@ -26,6 +23,6 @@ class AirbyteValueToCsvRow { } } -fun AirbyteValue.toCsvRecord(): Array { +fun ObjectValue.toCsvRecord(): Array { return AirbyteValueToCsvRow().convert(this) } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt index 6c3ded6d6a33..bc5e43c6d847 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt @@ -5,20 +5,23 @@ package io.airbyte.cdk.load import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.object_storage.AvroFormatConfiguration import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfiguration import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfiguration +import io.airbyte.cdk.load.data.avro.toAirbyteValue +import io.airbyte.cdk.load.data.avro.toAvroSchema import io.airbyte.cdk.load.data.toAirbyteValue import io.airbyte.cdk.load.file.GZIPProcessor import io.airbyte.cdk.load.file.NoopProcessor +import io.airbyte.cdk.load.file.avro.toAvroReader import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.airbyte.cdk.load.test.util.OutputRecord import io.airbyte.cdk.load.test.util.toOutputRecord import io.airbyte.cdk.load.util.deserializeToNode -import java.io.BufferedReader import java.io.InputStream import java.util.zip.GZIPInputStream import kotlinx.coroutines.Dispatchers @@ -44,14 +47,14 @@ class ObjectStorageDataDumper( .list(prefix) .map { listedObject: RemoteObject<*> -> client.get(listedObject.key) { objectData: InputStream -> - val reader = + val decompressed = when (compressionConfig?.compressor) { is GZIPProcessor -> GZIPInputStream(objectData) is NoopProcessor, null -> objectData else -> error("Unsupported compressor") - }.bufferedReader() - readLines(reader) + } + readLines(decompressed) } } .toList() @@ -61,10 +64,11 @@ class ObjectStorageDataDumper( } @Suppress("DEPRECATION") - private fun readLines(reader: BufferedReader): List = + private fun readLines(inputStream: InputStream): List = when (formatConfig) { is JsonFormatConfiguration -> { - reader + inputStream + .bufferedReader() .lineSequence() .map { line -> line @@ -75,12 +79,22 @@ class ObjectStorageDataDumper( .toList() } is CSVFormatConfiguration -> { - CSVParser(reader, CSVFormat.DEFAULT.withHeader()).use { + CSVParser(inputStream.bufferedReader(), CSVFormat.DEFAULT.withHeader()).use { it.records.map { record -> record.toAirbyteValue(stream.schemaWithMeta).toOutputRecord() } } } + is AvroFormatConfiguration -> { + inputStream + .toAvroReader(stream.schemaWithMeta.toAvroSchema(stream.descriptor)) + .use { reader -> + reader + .recordSequence() + .map { it.toAirbyteValue(stream.schemaWithMeta).toOutputRecord() } + .toList() + } + } else -> error("Unsupported format") } } diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt index 2e87550fb94b..8a62b79fa869 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt @@ -99,7 +99,6 @@ class S3MultipartUpload( override fun write(b: ByteArray) = runBlocking { workQueue.send { - println("write[${response.uploadId}](${b.size})") wrappingBuffer.write(b) if (underlyingBuffer.size() >= partSize) { uploadPart() @@ -111,7 +110,6 @@ class S3MultipartUpload( private suspend fun uploadPart() { streamProcessor.partFinisher.invoke(wrappingBuffer) val partNumber = uploadedParts.size + 1 - println("uploadPart[${response.uploadId}](${partNumber}, size=${underlyingBuffer.size()})") val request = UploadPartRequest { uploadId = response.uploadId bucket = response.bucket @@ -127,11 +125,9 @@ class S3MultipartUpload( } ) underlyingBuffer.reset() - println("after reset, size=${underlyingBuffer.size()}") } private suspend fun complete() { - println("complete()") if (underlyingBuffer.size() > 0) { uploadPart() } diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index f11f39bc9baa..f221c4db3a43 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.1.6 + dockerImageTag: 0.1.7 dockerRepository: airbyte/destination-s3-v2 githubIssueLabel: destination-s3-v2 icon: s3.svg @@ -46,4 +46,9 @@ data: secretStore: type: GSM alias: airbyte-connector-testing-secret-store + - name: SECRET_DESTINATION-S3-V2-AVRO + fileName: s3_dest_v2_avro_config.json + secretStore: + type: GSM + alias: airbyte-connector-testing-secret-store metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt index 311d9ae374a4..27cf9c152c41 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.s3_v2 import io.airbyte.cdk.load.check.DestinationChecker +import io.airbyte.cdk.load.command.object_storage.AvroFormatConfiguration import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration import io.airbyte.cdk.load.file.TimeProvider @@ -28,9 +29,12 @@ class S3V2Checker(private val timeProvider: TimeProvider) : runBlocking { if ( config.objectStorageFormatConfiguration !is JsonFormatConfiguration && - config.objectStorageFormatConfiguration !is CSVFormatConfiguration + config.objectStorageFormatConfiguration !is CSVFormatConfiguration && + config.objectStorageFormatConfiguration !is AvroFormatConfiguration ) { - throw ConfigurationException("Currently only JSON and CSV format is supported") + throw ConfigurationException( + "Currently only JSON, CSV, and Avro formats are supported" + ) } val s3Client = S3ClientFactory.make(config) val pathFactory = ObjectStoragePathFactory.from(config, timeProvider) diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt index df62a84aec69..8bcd14ebb1ed 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt @@ -6,13 +6,17 @@ package io.airbyte.integrations.destination.s3_v2 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.object_storage.AvroFormatConfiguration import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider import io.airbyte.cdk.load.data.DestinationRecordToAirbyteValueWithMeta +import io.airbyte.cdk.load.data.avro.toAvroRecord +import io.airbyte.cdk.load.data.avro.toAvroSchema import io.airbyte.cdk.load.data.toCsvPrinterWithHeader import io.airbyte.cdk.load.data.toCsvRecord import io.airbyte.cdk.load.data.toJson +import io.airbyte.cdk.load.file.avro.toAvroWriter import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory import io.airbyte.cdk.load.file.s3.S3Client import io.airbyte.cdk.load.file.s3.S3Object @@ -50,6 +54,12 @@ class S3V2Writer( @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation") inner class S3V2StreamLoader(override val stream: DestinationStream) : StreamLoader { private val partNumber = AtomicLong(0L) // TODO: Get from destination state + private val avroSchema = + if (formatConfig.objectStorageFormatConfiguration is AvroFormatConfiguration) { + stream.schemaWithMeta.toAvroSchema(stream.descriptor) + } else { + null + } override suspend fun processRecords( records: Iterator, @@ -79,6 +89,15 @@ class S3V2Writer( } } } + is AvroFormatConfiguration -> { + outputStream.toAvroWriter(avroSchema!!).use { writer -> + records.forEach { + writer.write( + recordDecorator.decorate(it).toAvroRecord(avroSchema) + ) + } + } + } else -> throw IllegalStateException("Unsupported format") } } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt index 0a19dadce18e..758205215e36 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt @@ -13,6 +13,7 @@ object S3V2TestUtils { const val JSON_GZIP_CONFIG_PATH = "secrets/s3_dest_v2_jsonl_gzip_config.json" const val CSV_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_csv_config.json" const val CSV_GZIP_CONFIG_PATH = "secrets/s3_dest_v2_csv_gzip_config.json" + const val AVRO_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_avro_config.json" fun getConfig(configPath: String): S3V2Specification = ValidatedJsonUtils.parseOne( S3V2Specification::class.java, diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index f99d01ab36c8..89188270ef54 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -37,3 +37,5 @@ class S3V2WriteTestJsonGzip : S3V2WriteTest(S3V2TestUtils.JSON_GZIP_CONFIG_PATH) class S3V2WriteTestCsvUncompressed : S3V2WriteTest(S3V2TestUtils.CSV_UNCOMPRESSED_CONFIG_PATH) class S3V2WriteTestCsvGzip : S3V2WriteTest(S3V2TestUtils.CSV_GZIP_CONFIG_PATH) + +class S3V2WriteTestAvro : S3V2WriteTest(S3V2TestUtils.AVRO_UNCOMPRESSED_CONFIG_PATH)