Skip to content

Commit

Permalink
Bulk Load CDK: Avro toolkit and S3V2 usage (#47121)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Oct 21, 2024
1 parent 73a9ce7 commit 2469b73
Show file tree
Hide file tree
Showing 17 changed files with 372 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ data class DestinationStream(
* what actually exists, as many destinations have legacy data from before this schema was
* adopted.
*/
val schemaWithMeta: AirbyteType
val schemaWithMeta: ObjectType
get() =
ObjectType(
linkedMapOf(
Expand Down
8 changes: 8 additions & 0 deletions airbyte-cdk/bulk/toolkits/load-avro/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')

api("org.apache.avro:avro:1.11.0")

testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:core:bulk-cdk-core-load"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data.avro

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema
import io.airbyte.cdk.load.data.BooleanType
import io.airbyte.cdk.load.data.DateType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.NullType
import io.airbyte.cdk.load.data.NumberType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.TimeType
import io.airbyte.cdk.load.data.TimestampType
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.UnknownType
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder

class AirbyteTypeToAvroSchema {
fun convert(airbyteSchema: AirbyteType, path: List<String>): Schema {
when (airbyteSchema) {
is ObjectType -> {
val name = path.last()
val namespace = path.take(path.size - 1).reversed().joinToString(".")
val builder = SchemaBuilder.record(name).namespace(namespace).fields()
return airbyteSchema.properties.entries
.fold(builder) { acc, (name, field) ->
// NOTE: We will not support nullable at this stage.
// All nullables should have been converted to union[this, null] upstream
// TODO: Enforce this
acc.name(name).type(convert(field.type, path + name)).noDefault()
}
.endRecord()
}
is ArrayType -> {
return SchemaBuilder.array()
.items(convert(airbyteSchema.items.type, path + "items"))
}
is ArrayTypeWithoutSchema ->
throw IllegalArgumentException("Array type without schema is not supported")
is BooleanType -> return SchemaBuilder.builder().booleanType()
is DateType ->
throw IllegalArgumentException("String-based date types are not supported")
is IntegerType -> return SchemaBuilder.builder().longType()
is NullType -> return SchemaBuilder.builder().nullType()
is NumberType -> return SchemaBuilder.builder().doubleType()
is ObjectTypeWithEmptySchema ->
throw IllegalArgumentException("Object type with empty schema is not supported")
is ObjectTypeWithoutSchema ->
throw IllegalArgumentException("Object type without schema is not supported")
is StringType -> return SchemaBuilder.builder().stringType()
is TimeType ->
throw IllegalArgumentException("String-based time types are not supported")
is TimestampType ->
throw IllegalArgumentException("String-based timestamp types are not supported")
is UnionType ->
return Schema.createUnion(airbyteSchema.options.map { convert(it, path) })
is UnknownType -> throw IllegalArgumentException("Unknown type: ${airbyteSchema.what}")
}
}
}

fun ObjectType.toAvroSchema(stream: DestinationStream.Descriptor): Schema {
val path = listOf(stream.namespace ?: "default", stream.name)
return AirbyteTypeToAvroSchema().convert(this, path)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data.avro

import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.BooleanValue
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeValue
import io.airbyte.cdk.load.data.TimestampValue
import io.airbyte.cdk.load.data.UnknownValue
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord

class AirbyteValueToAvroRecord {
fun convert(airbyteValue: AirbyteValue, schema: Schema): Any? {
when (airbyteValue) {
is ObjectValue -> {
val record = GenericData.Record(schema)
airbyteValue.values.forEach { (name, value) ->
record.put(name, convert(value, schema.getField(name).schema()))
}
return record
}
is ArrayValue -> {
val array = GenericData.Array<Any>(airbyteValue.values.size, schema)
airbyteValue.values.forEach { value ->
array.add(convert(value, schema.elementType))
}
return array
}
is BooleanValue -> return airbyteValue.value
is DateValue ->
throw IllegalArgumentException("String-based date types are not supported")
is IntegerValue -> return airbyteValue.value
NullValue -> return null
is NumberValue -> return airbyteValue.value.toDouble()
is StringValue -> return airbyteValue.value
is TimeValue ->
throw IllegalArgumentException("String-based time types are not supported")
is TimestampValue ->
throw IllegalArgumentException("String-based timestamp types are not supported")
is UnknownValue -> throw IllegalArgumentException("Unknown type is not supported")
}
}
}

fun ObjectValue.toAvroRecord(schema: Schema): GenericRecord {
return AirbyteValueToAvroRecord().convert(this, schema) as GenericRecord
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data.avro

import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.BooleanType
import io.airbyte.cdk.load.data.BooleanValue
import io.airbyte.cdk.load.data.DateType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullType
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberType
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeType
import io.airbyte.cdk.load.data.TimestampType
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.UnknownType
import org.apache.avro.generic.GenericArray
import org.apache.avro.generic.GenericRecord
import org.apache.avro.util.Utf8

class AvroRecordToAirbyteValue {
fun convert(avroValue: Any?, schema: AirbyteType): AirbyteValue {
when (schema) {
is ObjectType -> {
val properties = LinkedHashMap<String, AirbyteValue>()
schema.properties.forEach { (name, field) ->
val value = (avroValue as GenericRecord).get(name)
properties[name] = convert(value, field.type)
}
return ObjectValue(properties)
}
is ArrayType -> {
val items = schema.items
val values = (avroValue as GenericArray<*>).map { convert(it, items.type) }
return ArrayValue(values)
}
is ArrayTypeWithoutSchema ->
throw UnsupportedOperationException("ArrayTypeWithoutSchema is not supported")
is BooleanType -> return BooleanValue(avroValue as Boolean)
is DateType -> throw UnsupportedOperationException("DateType is not supported")
is IntegerType -> return IntegerValue(avroValue as Long)
is NullType -> return NullValue
is NumberType -> return NumberValue((avroValue as Double).toBigDecimal())
is ObjectTypeWithEmptySchema ->
throw UnsupportedOperationException("ObjectTypeWithEmptySchema is not supported")
is ObjectTypeWithoutSchema ->
throw UnsupportedOperationException("ObjectTypeWithoutSchema is not supported")
is StringType -> return StringValue((avroValue as Utf8).toString())
is TimeType -> throw UnsupportedOperationException("TimeType is not supported")
is TimestampType ->
throw UnsupportedOperationException("TimestampType is not supported")
is UnionType -> return tryConvertUnion(avroValue, schema)
is UnknownType -> throw UnsupportedOperationException("UnknownType is not supported")
else -> throw IllegalArgumentException("Unsupported schema type: $schema")
}
}

private fun tryConvertUnion(avroValue: Any?, schema: UnionType): AirbyteValue {
for (type in schema.options) {
try {
return convert(avroValue, type)
} catch (e: Exception) {
continue
}
}
throw IllegalArgumentException("Could not convert value to any of the union types")
}
}

fun GenericRecord.toAirbyteValue(schema: AirbyteType): AirbyteValue {
return AvroRecordToAirbyteValue().convert(this, schema)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.file.avro

import java.io.Closeable
import java.io.InputStream
import kotlin.io.path.outputStream
import org.apache.avro.Schema
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericRecord

class AvroReader(
private val dataFileReader: DataFileReader<GenericRecord>,
private val tmpFile: java.io.File
) : Closeable {
private fun read(): GenericRecord? {
return if (dataFileReader.hasNext()) {
dataFileReader.next()
} else {
null
}
}

fun recordSequence(): Sequence<GenericRecord> {
return generateSequence { read() }
}

override fun close() {
dataFileReader.close()
tmpFile.delete()
}
}

fun InputStream.toAvroReader(avroSchema: Schema): AvroReader {
val reader = GenericDatumReader<GenericRecord>(avroSchema)
val tmpFile =
kotlin.io.path.createTempFile(
prefix = "${avroSchema.namespace}.${avroSchema.name}",
suffix = ".avro"
)
tmpFile.outputStream().use { outputStream -> this.copyTo(outputStream) }
val file = tmpFile.toFile()
val dataFileReader = DataFileReader(file, reader)
return AvroReader(dataFileReader, file)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.file.avro

import java.io.Closeable
import java.io.OutputStream
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.generic.GenericRecord

class AvroWriter(
private val dataFileWriter: DataFileWriter<GenericRecord>,
) : Closeable {
fun write(record: GenericRecord) {
dataFileWriter.append(record)
}

fun flush() {
dataFileWriter.flush()
}

override fun close() {
dataFileWriter.close()
}
}

fun OutputStream.toAvroWriter(avroSchema: Schema): AvroWriter {
val datumWriter = GenericDatumWriter<GenericRecord>(avroSchema)
val dataFileWriter = DataFileWriter(datumWriter)
dataFileWriter.create(avroSchema, this)
return AvroWriter(dataFileWriter)
}
1 change: 1 addition & 0 deletions airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')
api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-avro')

api("org.apache.commons:commons-csv:1.10.0")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@ import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVPrinter

class AirbyteTypeToCsvHeader {
fun convert(schema: AirbyteType): Array<String> {
if (schema !is ObjectType) {
throw IllegalArgumentException("Only object types are supported")
}
fun convert(schema: ObjectType): Array<String> {
return schema.properties.map { it.key }.toTypedArray()
}
}

fun AirbyteType.toCsvHeader(): Array<String> {
fun ObjectType.toCsvHeader(): Array<String> {
return AirbyteTypeToCsvHeader().convert(this)
}

fun AirbyteType.toCsvPrinterWithHeader(writer: Writer): CSVPrinter =
CSVFormat.Builder.create().setHeader(*toCsvHeader()).build().print(writer)
fun ObjectType.toCsvPrinterWithHeader(writer: Writer): CSVPrinter =
CSVFormat.Builder.create().setHeader(*toCsvHeader()).setAutoFlush(true).build().print(writer)
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ package io.airbyte.cdk.load.data
import io.airbyte.cdk.load.util.serializeToString

class AirbyteValueToCsvRow {
fun convert(value: AirbyteValue): Array<String> {
if (value !is ObjectValue) {
throw IllegalArgumentException("Only object values are supported")
}
fun convert(value: ObjectValue): Array<String> {
return value.values.map { convertInner(it.value) }.toTypedArray()
}

Expand All @@ -26,6 +23,6 @@ class AirbyteValueToCsvRow {
}
}

fun AirbyteValue.toCsvRecord(): Array<String> {
fun ObjectValue.toCsvRecord(): Array<String> {
return AirbyteValueToCsvRow().convert(this)
}
Loading

0 comments on commit 2469b73

Please sign in to comment.