diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/check/DestinationChecker.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/check/DestinationChecker.kt index de44f79fd389..e46ef32658d3 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/check/DestinationChecker.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/check/DestinationChecker.kt @@ -7,7 +7,7 @@ package io.airbyte.cdk.load.check import io.airbyte.cdk.load.command.Append import io.airbyte.cdk.load.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.data.NullType +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema /** * A check operation that is run before the destination is used. @@ -25,7 +25,7 @@ interface DestinationChecker { DestinationStream( descriptor = DestinationStream.Descriptor("testing", "test"), importType = Append, - schema = NullType, + schema = ObjectTypeWithoutSchema, generationId = 1, minimumGenerationId = 0, syncId = 1, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationStream.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationStream.kt index bd14ada6491e..04044cbc6e41 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationStream.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationStream.kt @@ -5,14 +5,8 @@ package io.airbyte.cdk.load.command import io.airbyte.cdk.load.data.AirbyteType -import io.airbyte.cdk.load.data.ArrayType -import io.airbyte.cdk.load.data.FieldType -import io.airbyte.cdk.load.data.IntegerType -import io.airbyte.cdk.load.data.ObjectType -import io.airbyte.cdk.load.data.StringType import io.airbyte.cdk.load.data.json.AirbyteTypeToJsonSchema import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType -import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream import io.airbyte.protocol.models.v0.DestinationSyncMode @@ -51,59 +45,6 @@ data class DestinationStream( * what actually exists, as many destinations have legacy data from before this schema was * adopted. */ - val schemaWithMeta: ObjectType - get() = - ObjectType( - linkedMapOf( - DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to - FieldType(StringType, nullable = false), - DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT to - FieldType(IntegerType, nullable = false), - DestinationRecord.Meta.COLUMN_NAME_AB_META to - FieldType( - nullable = false, - type = - ObjectType( - linkedMapOf( - "sync_id" to FieldType(IntegerType, nullable = false), - "changes" to - FieldType( - nullable = false, - type = - ArrayType( - FieldType( - nullable = false, - type = - ObjectType( - linkedMapOf( - "field" to - FieldType( - StringType, - nullable = false - ), - "change" to - FieldType( - StringType, - nullable = false - ), - "reason" to - FieldType( - StringType, - nullable = false - ), - ) - ) - ) - ) - ) - ) - ) - ), - DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID to - FieldType(IntegerType, nullable = false), - DestinationRecord.Meta.COLUMN_NAME_DATA to FieldType(schema, nullable = false), - ) - ) /** * This is not fully round-trippable. Destinations don't care about most of the stuff in an diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapper.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapper.kt index a36c286a03a5..8e6936639d42 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapper.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapper.kt @@ -4,10 +4,13 @@ package io.airbyte.cdk.load.data -interface AirbyteSchemaIdentityMapper { - fun map(schema: AirbyteType): AirbyteType = +interface AirbyteSchemaMapper { + fun map(schema: AirbyteType): AirbyteType +} + +interface AirbyteSchemaIdentityMapper : AirbyteSchemaMapper { + override fun map(schema: AirbyteType): AirbyteType = when (schema) { - is NullType -> mapNull(schema) is StringType -> mapString(schema) is BooleanType -> mapBoolean(schema) is IntegerType -> mapInteger(schema) @@ -26,7 +29,6 @@ interface AirbyteSchemaIdentityMapper { is UnknownType -> mapUnknown(schema) } - fun mapNull(schema: NullType): AirbyteType = schema fun mapString(schema: StringType): AirbyteType = schema fun mapBoolean(schema: BooleanType): AirbyteType = schema fun mapInteger(schema: IntegerType): AirbyteType = schema diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteType.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteType.kt index 5093f0597077..f0356dd8cb51 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteType.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteType.kt @@ -6,8 +6,6 @@ package io.airbyte.cdk.load.data sealed interface AirbyteType -data object NullType : AirbyteType - data object StringType : AirbyteType data object BooleanType : AirbyteType diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToAirbyteTypeWithMeta.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToAirbyteTypeWithMeta.kt new file mode 100644 index 000000000000..d7d399e85ca6 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToAirbyteTypeWithMeta.kt @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data + +import io.airbyte.cdk.load.message.DestinationRecord + +class AirbyteTypeToAirbyteTypeWithMeta { + fun convert(schema: AirbyteType): ObjectType = + ObjectType( + linkedMapOf( + DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to + FieldType(StringType, nullable = false), + DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT to + FieldType(IntegerType, nullable = false), + DestinationRecord.Meta.COLUMN_NAME_AB_META to + FieldType( + nullable = false, + type = + ObjectType( + linkedMapOf( + "sync_id" to FieldType(IntegerType, nullable = false), + "changes" to + FieldType( + nullable = false, + type = + ArrayType( + FieldType( + nullable = false, + type = + ObjectType( + linkedMapOf( + "field" to + FieldType( + StringType, + nullable = false + ), + "change" to + FieldType( + StringType, + nullable = false + ), + "reason" to + FieldType( + StringType, + nullable = false + ), + ) + ) + ) + ) + ) + ) + ) + ), + DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID to + FieldType(IntegerType, nullable = false), + DestinationRecord.Meta.COLUMN_NAME_DATA to FieldType(schema, nullable = false), + ) + ) +} + +fun AirbyteType.withAirbyteMeta(): ObjectType = AirbyteTypeToAirbyteTypeWithMeta().convert(this) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt index eaab8330f276..f72d908425fd 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt @@ -45,7 +45,6 @@ open class AirbyteValueIdentityMapper( mapTimestampWithTimezone(value as TimestampValue, path) is TimestampTypeWithoutTimezone -> mapTimestampWithoutTimezone(value as TimestampValue, path) - is NullType -> mapNull(path) is UnknownType -> { collectFailure(path) mapNull(path) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMeta.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMeta.kt index 84392b1503c3..8343c158ce09 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMeta.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMeta.kt @@ -4,29 +4,24 @@ package io.airbyte.cdk.load.data -import io.airbyte.cdk.load.command.DestinationCatalog +import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.message.DestinationRecord.Meta -import io.micronaut.context.annotation.Secondary -import jakarta.inject.Singleton import java.util.* -@Singleton -@Secondary -class DestinationRecordToAirbyteValueWithMeta(private val catalog: DestinationCatalog) { - fun decorate(record: DestinationRecord): ObjectValue { - val streamActual = catalog.getStream(record.stream.name, record.stream.namespace) +class DestinationRecordToAirbyteValueWithMeta(val stream: DestinationStream) { + fun convert(data: AirbyteValue, emittedAtMs: Long, meta: DestinationRecord.Meta?): ObjectValue { return ObjectValue( linkedMapOf( Meta.COLUMN_NAME_AB_RAW_ID to StringValue(UUID.randomUUID().toString()), - Meta.COLUMN_NAME_AB_EXTRACTED_AT to IntegerValue(record.emittedAtMs), + Meta.COLUMN_NAME_AB_EXTRACTED_AT to IntegerValue(emittedAtMs), Meta.COLUMN_NAME_AB_META to ObjectValue( linkedMapOf( - "sync_id" to IntegerValue(streamActual.syncId), + "sync_id" to IntegerValue(stream.syncId), "changes" to ArrayValue( - record.meta?.changes?.map { + meta?.changes?.map { ObjectValue( linkedMapOf( "field" to StringValue(it.field), @@ -39,9 +34,12 @@ class DestinationRecordToAirbyteValueWithMeta(private val catalog: DestinationCa ) ) ), - Meta.COLUMN_NAME_AB_GENERATION_ID to IntegerValue(streamActual.generationId), - Meta.COLUMN_NAME_DATA to record.data + Meta.COLUMN_NAME_AB_GENERATION_ID to IntegerValue(stream.generationId), + Meta.COLUMN_NAME_DATA to data ) ) } } + +fun DestinationRecord.dataWithAirbyteMeta(stream: DestinationStream) = + DestinationRecordToAirbyteValueWithMeta(stream).convert(data, emittedAtMs, meta) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/NullableToUnionNull.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/NullableToUnionNull.kt deleted file mode 100644 index 8b1ea3daa35a..000000000000 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/NullableToUnionNull.kt +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.load.data - -class NullableToUnionNull : AirbyteSchemaIdentityMapper { - override fun mapField(field: FieldType): FieldType { - if (field.nullable) { - return FieldType(UnionType(listOf(field.type, NullType)), nullable = false) - } - return field - } -} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt index 7217f8f32097..c6af8baad53f 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt @@ -8,30 +8,24 @@ import io.airbyte.cdk.load.message.DestinationRecord class UnionTypeToDisjointRecord : AirbyteSchemaIdentityMapper { override fun mapUnion(schema: UnionType): AirbyteType { - val (nullOptions, nonNullOptions) = schema.options.partition { it is NullType } - if (nonNullOptions.size < 2) { + if (schema.options.size < 2) { return schema } /* Create a schema of { "type": "string", "": , etc... } */ val properties = linkedMapOf("type" to FieldType(StringType, nullable = false)) - nonNullOptions.forEach { + schema.options.forEach { val name = typeName(it) if (name in properties) { throw IllegalArgumentException("Union of types with same name: $name") } properties[typeName(it)] = FieldType(it, nullable = true) } - val obj = ObjectType(properties) - if (nullOptions.isEmpty()) { - return obj - } - return UnionType(nullOptions + obj) + return ObjectType(properties) } companion object { fun typeName(type: AirbyteType): String = when (type) { - is NullType -> "null" is StringType -> "string" is BooleanType -> "boolean" is IntegerType -> "integer" @@ -58,8 +52,7 @@ class UnionValueToDisjointRecord(meta: DestinationRecord.Meta) : AirbyteValueIde schema: UnionType, path: List ): AirbyteValue { - val nNonNullOptions = schema.options.filter { it !is NullType }.size - if (nNonNullOptions < 2) { + if (schema.options.size < 2) { return value } @@ -82,7 +75,6 @@ class UnionValueToDisjointRecord(meta: DestinationRecord.Meta) : AirbyteValueIde is BooleanType -> value is BooleanValue is DateType -> value is DateValue is IntegerType -> value is IntegerValue - is NullType -> value is NullValue is NumberType -> value is NumberValue is ObjectType, is ObjectTypeWithoutSchema, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/AirbyteTypeToJsonSchema.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/AirbyteTypeToJsonSchema.kt index c1f290a6f894..a55eb19183b6 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/AirbyteTypeToJsonSchema.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/AirbyteTypeToJsonSchema.kt @@ -16,7 +16,6 @@ class AirbyteTypeToJsonSchema { fun convert(airbyteType: AirbyteType): JsonNode { return when (airbyteType) { - is NullType -> ofType("null") is StringType -> ofType("string") is BooleanType -> ofType("boolean") is IntegerType -> ofType("integer") @@ -25,13 +24,13 @@ class AirbyteTypeToJsonSchema { JsonNodeFactory.instance .objectNode() .put("type", "array") - .set("items", fromFieldType(airbyteType.items)) + .set("items", convert(airbyteType.items.type)) is ArrayTypeWithoutSchema -> ofType("array") is ObjectType -> { val objNode = ofType("object") val properties = objNode.putObject("properties") airbyteType.properties.forEach { (name, field) -> - properties.replace(name, fromFieldType(field)) + properties.replace(name, convert(field.type)) } objNode } @@ -68,14 +67,4 @@ class AirbyteTypeToJsonSchema { is UnknownType -> JsonNodeFactory.instance.objectNode() } } - - private fun fromFieldType(field: FieldType): JsonNode { - if (field.nullable) { - if (field.type is UnionType) { - return convert(UnionType(options = field.type.options + NullType)) - } - return convert(UnionType(options = listOf(field.type, NullType))) - } - return convert(field.type) - } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt index 7653c70e467c..163cff0358dc 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt @@ -10,7 +10,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode import io.airbyte.cdk.load.data.* class JsonSchemaToAirbyteType { - fun convert(schema: JsonNode): AirbyteType { + fun convert(schema: JsonNode): AirbyteType = convertInner(schema)!! + + private fun convertInner(schema: JsonNode): AirbyteType? { // try { if (schema.isObject && schema.has("type")) { // Normal json object with {"type": ..., ...} @@ -24,33 +26,33 @@ class JsonSchemaToAirbyteType { "number" -> fromNumber(schema) "array" -> fromArray(schema) "object" -> fromObject(schema) - "null" -> NullType + "null" -> null else -> throw IllegalArgumentException( "Unknown type: ${ - schema.get("type").asText() - }" + schema.get("type").asText() + }" ) } } else if (schemaType.isArray) { // {"type": [...], ...} unionFromCombinedTypes(schemaType.toList(), schema) } else { - UnknownType("unspported type for 'type' field: $schemaType") + UnknownType("unsupported type for 'type' field: $schemaType") } } else if (schema.isObject) { // {"oneOf": [...], ...} or {"anyOf": [...], ...} or {"allOf": [...], ...} val options = schema.get("oneOf") ?: schema.get("anyOf") ?: schema.get("allOf") return if (options != null) { - UnionType(options.map { convert(it as ObjectNode) }) + optionsToUnionOrSingleType(options.mapNotNull { convertInner(it as ObjectNode) }) } else { // Default to object if no type and not a union type - convert((schema as ObjectNode).put("type", "object")) + convertInner((schema as ObjectNode).put("type", "object")) } } else if (schema.isTextual) { // "" val typeSchema = JsonNodeFactory.instance.objectNode().put("type", schema.asText()) - return convert(typeSchema) + return convertInner(typeSchema) } else { return UnknownType("Unknown schema type: $schema") } @@ -78,8 +80,8 @@ class JsonSchemaToAirbyteType { else -> throw IllegalArgumentException( "Unknown string format: ${ - schema.get("format").asText() - }" + schema.get("format").asText() + }" ) } @@ -96,8 +98,9 @@ class JsonSchemaToAirbyteType { if (items.isEmpty) { return ArrayTypeWithoutSchema } - val itemOptions = UnionType(items.map { convert(it) }) - return ArrayType(fieldFromUnion(itemOptions)) + val itemOptions = UnionType(items.mapNotNull { convertInner(it) }) + val itemType = optionsToUnionOrSingleType(itemOptions.options) + return ArrayType(FieldType(itemType, true)) } return ArrayType(fieldFromSchema(items as ObjectNode)) } @@ -107,60 +110,48 @@ class JsonSchemaToAirbyteType { if (properties.isEmpty) { return ObjectTypeWithEmptySchema } - val requiredFields = schema.get("required")?.map { it.asText() } ?: emptyList() - return objectFromProperties(properties as ObjectNode, requiredFields) + val propertiesMapped = + properties + .fields() + .asSequence() + .map { (name, node) -> name to fieldFromSchema(node as ObjectNode) } + .toMap(LinkedHashMap()) + return ObjectType(propertiesMapped) } private fun fieldFromSchema( fieldSchema: ObjectNode, - onRequiredList: Boolean = false ): FieldType { - val markedRequired = fieldSchema.get("required")?.asBoolean() ?: false - val nullable = !(onRequiredList || markedRequired) - val airbyteType = convert(fieldSchema) - if (airbyteType is UnionType) { - return fieldFromUnion(airbyteType, nullable) - } else { - return FieldType(airbyteType, nullable) - } - } - - private fun fieldFromUnion(unionType: UnionType, nullable: Boolean = false): FieldType { - if (unionType.options.contains(NullType)) { - val filtered = unionType.options.filter { it != NullType } - return FieldType(UnionType(filtered), nullable = true) - } - return FieldType(unionType, nullable = nullable) - } - - private fun objectFromProperties(schema: ObjectNode, requiredFields: List): ObjectType { - val properties = - schema - .fields() - .asSequence() - .map { (name, node) -> - name to fieldFromSchema(node as ObjectNode, requiredFields.contains(name)) - } - .toMap(LinkedHashMap()) - return ObjectType(properties) + val airbyteType = + convertInner(fieldSchema) ?: UnknownType("Illegal null type as field type") + return FieldType(airbyteType, nullable = true) } private fun unionFromCombinedTypes( options: List, parentSchema: ObjectNode - ): UnionType { + ): AirbyteType { // Denormalize the properties across each type (the converter only checks what matters // per type). val unionOptions = - options.map { + options.mapNotNull { if (it.isTextual) { val schema = parentSchema.deepCopy() schema.put("type", it.textValue()) - convert(schema) + convertInner(schema) } else { - convert(it) + convertInner(it) } } - return UnionType(unionOptions) + return optionsToUnionOrSingleType(unionOptions) } + + private fun optionsToUnionOrSingleType(options: List): AirbyteType = + if (options.isEmpty()) { + UnionType(listOf(UnknownType("No valid options in union"))) + } else if (options.size == 1) { + options.first() + } else { + UnionType(options) + } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt index aada67af68ee..96557f74ef78 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt @@ -30,7 +30,6 @@ class JsonToAirbyteValue { is BooleanType -> toBoolean(json) is DateType -> DateValue(json.asText()) is IntegerType -> toInteger(json) - is NullType -> toNull(json) is NumberType -> toNumber(json) is ObjectType -> toObject(json, schema) is ObjectTypeWithoutSchema, @@ -185,7 +184,6 @@ class JsonToAirbyteValue { is BooleanType -> json.isBoolean is DateType -> json.isTextual is IntegerType -> json.isIntegralNumber - is NullType -> json.isNull is NumberType -> json.isNumber is ObjectType, is ObjectTypeWithoutSchema, diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapperTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapperTest.kt index f91d2f9680a6..3338f3e7edcb 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapperTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapperTest.kt @@ -19,9 +19,8 @@ class AirbyteSchemaIdentityMapperTest { .with(IntegerType) .with(BooleanType) .with(NumberType) - .with(NullType) .with(ArrayType(FieldType(StringType, true))) - .with(UnionType(listOf(StringType, IntegerType, NullType))) + .with(UnionType(listOf(StringType, IntegerType))) .withRecord() .with(TimeTypeWithTimezone) .with(TimeTypeWithoutTimezone) @@ -33,7 +32,6 @@ class AirbyteSchemaIdentityMapperTest { .with(ArrayTypeWithoutSchema) .endRecord() .endRecord() - .with(NullType) .build() val mapper = object : AirbyteSchemaIdentityMapper {} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt index 6adcad2dadd4..f162f8bfc630 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt @@ -35,7 +35,6 @@ class AirbyteValueIdentityMapperTest { ArrayTypeWithoutSchema ) .withRecord() - .with(NullValue, NullType) .endRecord() .endRecord() .build() diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/NullableToUnionNullTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/NullableToUnionNullTest.kt deleted file mode 100644 index 7832c3628232..000000000000 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/NullableToUnionNullTest.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.load.data - -import io.airbyte.cdk.load.test.util.Root -import io.airbyte.cdk.load.test.util.SchemaRecordBuilder -import org.junit.jupiter.api.Assertions -import org.junit.jupiter.api.Test - -class NullableToUnionNullTest { - @Test - fun testBasicBehavior() { - val (inputSchema, expectedOutput) = - SchemaRecordBuilder() - .with(FieldType(StringType, nullable = false)) - .with( - FieldType(IntegerType, nullable = true), - FieldType(UnionType(listOf(IntegerType, NullType)), nullable = false) - ) - .build() - Assertions.assertEquals(NullableToUnionNull().map(inputSchema), expectedOutput) - } - - @Test - fun testWackyBehavior() { - val (inputSchema, expectedOutput) = - SchemaRecordBuilder() - .with(FieldType(UnionType(listOf(StringType, IntegerType)), nullable = false)) - .with( - FieldType(UnionType(listOf(StringType, IntegerType)), nullable = true), - FieldType( - UnionType(listOf(UnionType(listOf(StringType, IntegerType)), NullType)), - nullable = false - ) - ) - .with(FieldType(UnionType(listOf(StringType, NullType)), nullable = false)) - .with( - FieldType(UnionType(listOf(StringType, NullType)), nullable = true), - FieldType( - UnionType(listOf(UnionType(listOf(StringType, NullType)), NullType)), - nullable = false - ) - ) - .build() - Assertions.assertEquals(NullableToUnionNull().map(inputSchema), expectedOutput) - } -} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecordTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecordTest.kt index dee9cc9e95a8..4e601b50f5cb 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecordTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecordTest.kt @@ -12,7 +12,7 @@ import org.junit.jupiter.api.Test class UnionTypeToDisjointRecordTest { @Test fun testBasicSchemaBehavior() { - val disjoinRecord = + val disjointRecord = ObjectType( linkedMapOf( "type" to FieldType(StringType, nullable = false), @@ -23,15 +23,7 @@ class UnionTypeToDisjointRecordTest { val (inputSchema, expectedOutput) = SchemaRecordBuilder() .with(UnionType(listOf(StringType))) // union of 1 => ignore - .with(UnionType(listOf(StringType, NullType))) // union of 1 w/ null => ignore - .with( - UnionType(listOf(StringType, IntegerType)), - expected = disjoinRecord - ) // union of 2 => disjoint - .with( - UnionType(listOf(StringType, IntegerType, NullType)), - expected = UnionType(listOf(NullType, disjoinRecord)) - ) // union of 2 w/ null => disjoint + .with(UnionType(listOf(StringType, IntegerType)), expected = disjointRecord) .build() val output = UnionTypeToDisjointRecord().map(inputSchema) Assertions.assertEquals(expectedOutput, output) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/AirbyteSchemaTypeToJsonSchemaTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/AirbyteSchemaTypeToJsonSchemaTest.kt index b4139df4af5e..6f4f1bc8fc91 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/AirbyteSchemaTypeToJsonSchemaTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/AirbyteSchemaTypeToJsonSchemaTest.kt @@ -4,145 +4,172 @@ package io.airbyte.cdk.load.data.json -import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.node.JsonNodeFactory -import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.cdk.load.data.ArrayType +import io.airbyte.cdk.load.data.BooleanType +import io.airbyte.cdk.load.data.DateType +import io.airbyte.cdk.load.data.FieldType +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.NumberType +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.TimeTypeWithTimezone +import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone +import io.airbyte.cdk.load.data.UnionType +import io.airbyte.cdk.load.util.deserializeToNode +import io.airbyte.cdk.load.util.serializeToString import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test class AirbyteSchemaTypeToJsonSchemaTest { - @Test - fun testRoundTrip() { - val schema = JsonNodeFactory.instance.objectNode() - val props = schema.putObject("properties") - props.putObject("name").put("type", "string").put("required", true) - props.putObject("age").put("type", "integer") - props.putObject("is_cool").put("type", "boolean") - props.putObject("height").put("type", "number") - props.putObject("friends").put("type", "array").putObject("items").put("type", "string") - val subProps = props.putObject("address").put("type", "object").putObject("properties") - subProps.putObject("street").put("type", "string") - subProps.putObject("city").put("type", "string") - props.putObject("null_field").put("type", "null") - val union = props.putObject("nullable_union").putArray("oneOf") - union.add(JsonNodeFactory.instance.objectNode().put("type", "string")) - union.add(JsonNodeFactory.instance.objectNode().put("type", "integer")) - union.add(JsonNodeFactory.instance.objectNode().put("type", "null")) - - val union2 = props.putObject("nonnullable_union") - val union2opts = union2.putArray("oneOf") - union2opts.add(JsonNodeFactory.instance.objectNode().put("type", "string")) - union2opts.add(JsonNodeFactory.instance.objectNode().put("type", "integer")) - union2.put("required", true) - - props.putObject("combined_null").putArray("type").add("string").add("null") - - val combinedDenormalized = props.putObject("combined_denormalized") - combinedDenormalized.putArray("type").add("string").add("object") - combinedDenormalized.putObject("properties").putObject("name").put("type", "string") - - props - .putObject("union_array") - .put("type", "array") - .putArray("items") - .add("string") - .add("integer") - - props.putObject("date").put("type", "string").put("format", "date") - props.putObject("time").put("type", "string").put("format", "time") - props.putObject("timestamp").put("type", "string").put("format", "date-time") - props - .putObject("time_without_timezone") - .put("type", "string") - .put("format", "time") - .put("airbyte_type", "time_without_timezone") - props - .putObject("timestamp_without_timezone") - .put("type", "string") - .put("format", "date-time") - .put("airbyte_type", "timestamp_without_timezone") - - val converted = JsonSchemaToAirbyteType().convert(schema) - val unconverted = AirbyteTypeToJsonSchema().convert(converted) - - val propsOut = unconverted.get("properties") - Assertions.assertEquals(ofType("string", false), propsOut.get("name")) - Assertions.assertEquals(ofType("integer", true), propsOut.get("age")) - Assertions.assertEquals(ofType("boolean", true), propsOut.get("is_cool")) - Assertions.assertEquals(ofType("number", true), propsOut.get("height")) - - val friends = JsonNodeFactory.instance.objectNode() - friends.put("type", "array").replace("items", ofType("string", true)) - Assertions.assertEquals(ofNullable(friends), propsOut.get("friends")) - - val address = JsonNodeFactory.instance.objectNode() - val addressProps = address.put("type", "object").putObject("properties") - addressProps.replace("street", ofType("string", true)) - addressProps.replace("city", ofType("string", true)) - Assertions.assertEquals(ofNullable(address), propsOut.get("address")) - - Assertions.assertEquals(ofType("null", true), propsOut.get("null_field")) - - val nullableUnion = JsonNodeFactory.instance.objectNode() - nullableUnion - .putArray("oneOf") - .add(ofType("string", false)) - .add(ofType("integer", false)) - .add(ofType("null", false)) - Assertions.assertEquals(nullableUnion, propsOut.get("nullable_union")) - - val nonnullableUnion = JsonNodeFactory.instance.objectNode() - nonnullableUnion - .putArray("oneOf") - .add(ofType("string", false)) - .add(ofType("integer", false)) - Assertions.assertEquals(nonnullableUnion, propsOut.get("nonnullable_union")) - - Assertions.assertEquals(ofType("string", true), propsOut.get("combined_null")) - - val combinedDenormed = JsonNodeFactory.instance.objectNode() - val cdObj = ofType("object", false) - cdObj.putObject("properties").replace("name", ofType("string", true)) - combinedDenormed - .putArray("oneOf") - .add(ofType("string", false)) - .add(cdObj) - .add(ofType("null", false)) - Assertions.assertEquals(combinedDenormed, propsOut.get("combined_denormalized")) - - val unionArrayOut = JsonNodeFactory.instance.objectNode() - unionArrayOut - .put("type", "array") - .putObject("items") - .putArray("oneOf") - .add(ofType("string", false)) - .add(ofType("integer", false)) - Assertions.assertEquals(ofNullable(unionArrayOut), propsOut.get("union_array")) - - val timeTypeFieldNames = - listOf("time", "timestamp", "time_without_timezone", "timestamp_without_timezone") - timeTypeFieldNames.forEach { fieldName -> - val expected = props.get(fieldName) as ObjectNode - if (listOf("date-time", "time").contains(expected.get("format").asText())) { - val formatName = expected.get("format").asText().replace("date-time", "timestamp") - if (!expected.has("airbyte_type")) { - expected.put("airbyte_type", "${formatName}_with_timezone") + // A json of every supported type + private val airbyteType = + ObjectType( + mapOf( + "name" to StringType, + "age" to IntegerType, + "is_cool" to BooleanType, + "height" to NumberType, + "alt_integer" to IntegerType, + "friends" to ArrayType(FieldType(StringType, true)), + "mixed_array" to + ArrayType( + FieldType(UnionType(listOf(StringType, IntegerType)), nullable = true) + ), + "address" to + ObjectType( + linkedMapOf( + "street" to FieldType(StringType, true), + "city" to FieldType(StringType, true) + ) + ), + "nonnullable_union" to UnionType(listOf(StringType)), + "combined_denormalized" to + ObjectType(linkedMapOf("name" to FieldType(StringType, true))), + "union_array" to + ArrayType(FieldType(UnionType(listOf(StringType, IntegerType)), true)), + "date" to DateType, + "time" to TimeTypeWithTimezone, + "time_without_timezone" to TimeTypeWithoutTimezone, + "timestamp" to TimestampTypeWithTimezone, + "timestamp_without_timezone" to TimestampTypeWithoutTimezone + ) + .map { it.key to FieldType(it.value, nullable = true) } + .let { linkedMapOf(*it.toTypedArray()) } + ) + + // the json equivalent + private val json = + """ + { + "type": "object", + "properties": { + "name": { + "type": "string" + }, + "age": { + "type": "integer" + }, + "is_cool": { + "type": "boolean" + }, + "height": { + "type": "number" + }, + "alt_integer": { + "type": "integer" + }, + "friends": { + "type": "array", + "items": { + "type": "string" + } + }, + "mixed_array": { + "type": "array", + "items": { + "oneOf": [ + { + "type": "string" + }, + { + "type": "integer" + } + ] + } + }, + "address": { + "type": "object", + "properties": { + "street": { + "type": "string" + }, + "city": { + "type": "string" } + } + }, + "nonnullable_union": { + "oneOf": [ + { + "type": "string" + } + ] + }, + "combined_denormalized": { + "type": "object", + "properties": { + "name": { + "type": "string" + } + } + }, + "union_array": { + "type": "array", + "items": { + "oneOf": [ + { + "type": "string" + }, + { + "type": "integer" + } + ] + } + }, + "date": { + "type": "string", + "format": "date" + }, + "time": { + "type": "string", + "format": "time", + "airbyte_type": "time_with_timezone" + }, + "time_without_timezone": { + "type": "string", + "format": "time", + "airbyte_type": "time_without_timezone" + }, + "timestamp": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "timestamp_without_timezone": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_without_timezone" } - Assertions.assertEquals(ofNullable(expected), propsOut.get(fieldName)) - } - } - - private fun ofType(type: String, nullable: Boolean = true): ObjectNode = - if (nullable) { - ofNullable(ofType(type, false)) - } else { - JsonNodeFactory.instance.objectNode().put("type", type) + } } + """.trimIndent() - private fun ofNullable(typeNode: JsonNode): ObjectNode { - val schema = JsonNodeFactory.instance.objectNode() - schema.putArray("oneOf").add(typeNode).add(ofType("null", false)) - return schema + @Test + fun testToJsonSchema() { + val expected = json.deserializeToNode().serializeToString() + val actual = AirbyteTypeToJsonSchema().convert(airbyteType).serializeToString() + Assertions.assertEquals(expected, actual) } } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/AirbyteValueToJsonTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/AirbyteValueToJsonTest.kt index 4d2bede2c25a..76ca3ad817cf 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/AirbyteValueToJsonTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/AirbyteValueToJsonTest.kt @@ -13,8 +13,6 @@ import io.airbyte.cdk.load.data.DateValue import io.airbyte.cdk.load.data.FieldType import io.airbyte.cdk.load.data.IntegerType import io.airbyte.cdk.load.data.IntegerValue -import io.airbyte.cdk.load.data.NullType -import io.airbyte.cdk.load.data.NullValue import io.airbyte.cdk.load.data.NumberType import io.airbyte.cdk.load.data.NumberValue import io.airbyte.cdk.load.data.ObjectType @@ -49,10 +47,7 @@ class AirbyteValueToJsonTest { "city" to StringValue("San Francisco") ) ), - "null_field" to NullValue, - "nullable_union" to IntegerValue(42), - "nonnullable_union" to StringValue("hello"), - "combined_null" to StringValue("hello"), + "union" to StringValue("hello"), "combined_denormalized" to ObjectValue(linkedMapOf("name" to StringValue("hello"))), "union_array" to ArrayValue(listOf(StringValue("hello"), IntegerValue(42))), @@ -81,12 +76,7 @@ class AirbyteValueToJsonTest { ), false ), - "null_field" to FieldType(NullType, false), - "nullable_union" to - FieldType(UnionType(listOf(StringType, IntegerType, NullType)), false), - "nonnullable_union" to - FieldType(UnionType(listOf(StringType, IntegerType)), true), - "combined_null" to FieldType(UnionType(listOf(StringType, NullType)), false), + "union" to FieldType(UnionType(listOf(StringType, IntegerType)), true), "combined_denormalized" to FieldType( ObjectType(linkedMapOf("name" to FieldType(StringType, true))), diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt index 6264dea2dca4..5a291518d38a 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt @@ -12,7 +12,6 @@ import io.airbyte.cdk.load.data.BooleanType import io.airbyte.cdk.load.data.DateType import io.airbyte.cdk.load.data.FieldType import io.airbyte.cdk.load.data.IntegerType -import io.airbyte.cdk.load.data.NullType import io.airbyte.cdk.load.data.NumberType import io.airbyte.cdk.load.data.ObjectType import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema @@ -32,13 +31,6 @@ class JsonSchemaToAirbyteSchemaTypeTest { return JsonNodeFactory.instance.objectNode().put("type", type) } - @Test - fun testNull() { - val nullType = ofType("null") - val airbyteType = JsonSchemaToAirbyteType().convert(nullType) - Assertions.assertTrue(airbyteType is NullType) - } - @Test fun testString() { val stringType = ofType("string") diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValueTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValueTest.kt index 515099026352..b5d309481f11 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValueTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValueTest.kt @@ -15,8 +15,6 @@ import io.airbyte.cdk.load.data.DateValue import io.airbyte.cdk.load.data.FieldType import io.airbyte.cdk.load.data.IntegerType import io.airbyte.cdk.load.data.IntegerValue -import io.airbyte.cdk.load.data.NullType -import io.airbyte.cdk.load.data.NullValue import io.airbyte.cdk.load.data.NumberType import io.airbyte.cdk.load.data.NumberValue import io.airbyte.cdk.load.data.ObjectType @@ -36,11 +34,6 @@ import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test class JsonToAirbyteValueTest { - @Test - fun testNull() { - val value = JsonToAirbyteValue().convert(JsonNodeFactory.instance.nullNode(), NullType) - Assertions.assertTrue(value is NullValue) - } @Test fun testString() { diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt index a4c8797db572..8cedca5ffb42 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt @@ -11,7 +11,6 @@ import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema import io.airbyte.cdk.load.data.BooleanType import io.airbyte.cdk.load.data.DateType import io.airbyte.cdk.load.data.IntegerType -import io.airbyte.cdk.load.data.NullType import io.airbyte.cdk.load.data.NumberType import io.airbyte.cdk.load.data.ObjectType import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema @@ -54,7 +53,6 @@ class AirbyteTypeToAvroSchema { LogicalTypes.date().addToSchema(schema) } is IntegerType -> SchemaBuilder.builder().longType() - is NullType -> SchemaBuilder.builder().nullType() is NumberType -> SchemaBuilder.builder().doubleType() is ObjectTypeWithEmptySchema -> throw IllegalArgumentException("Object type with empty schema is not supported") diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt index 0cd0ba1f680d..1ba25b45e78e 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt @@ -14,8 +14,6 @@ import io.airbyte.cdk.load.data.BooleanValue import io.airbyte.cdk.load.data.DateType import io.airbyte.cdk.load.data.IntegerType import io.airbyte.cdk.load.data.IntegerValue -import io.airbyte.cdk.load.data.NullType -import io.airbyte.cdk.load.data.NullValue import io.airbyte.cdk.load.data.NumberType import io.airbyte.cdk.load.data.NumberValue import io.airbyte.cdk.load.data.ObjectType @@ -55,7 +53,6 @@ class AvroRecordToAirbyteValue { is BooleanType -> return BooleanValue(avroValue as Boolean) is DateType -> throw UnsupportedOperationException("DateType is not supported") is IntegerType -> return IntegerValue(avroValue as Long) - is NullType -> return NullValue is NumberType -> return NumberValue((avroValue as Double).toBigDecimal()) is ObjectTypeWithEmptySchema -> throw UnsupportedOperationException("ObjectTypeWithEmptySchema is not supported") diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt index bc70149ebc3c..e1aec13d8798 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt @@ -10,11 +10,12 @@ import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider import io.airbyte.cdk.load.command.object_storage.ParquetFormatConfiguration -import io.airbyte.cdk.load.data.DestinationRecordToAirbyteValueWithMeta import io.airbyte.cdk.load.data.avro.toAvroRecord import io.airbyte.cdk.load.data.avro.toAvroSchema import io.airbyte.cdk.load.data.csv.toCsvRecord +import io.airbyte.cdk.load.data.dataWithAirbyteMeta import io.airbyte.cdk.load.data.json.toJson +import io.airbyte.cdk.load.data.withAirbyteMeta import io.airbyte.cdk.load.file.avro.toAvroWriter import io.airbyte.cdk.load.file.csv.toCsvPrinterWithHeader import io.airbyte.cdk.load.file.parquet.toParquetWriter @@ -33,7 +34,6 @@ interface ObjectStorageFormattingWriter : Closeable { @Singleton @Secondary class ObjectStorageFormattingWriterFactory( - private val recordDecorator: DestinationRecordToAirbyteValueWithMeta, private val formatConfigProvider: ObjectStorageFormatConfigurationProvider, ) { fun create( @@ -41,14 +41,13 @@ class ObjectStorageFormattingWriterFactory( outputStream: OutputStream ): ObjectStorageFormattingWriter { return when (formatConfigProvider.objectStorageFormatConfiguration) { - is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator) + is JsonFormatConfiguration -> JsonFormattingWriter(stream, outputStream) is AvroFormatConfiguration -> AvroFormattingWriter( stream, outputStream, formatConfigProvider.objectStorageFormatConfiguration as AvroFormatConfiguration, - recordDecorator ) is ParquetFormatConfiguration -> ParquetFormattingWriter( @@ -56,19 +55,18 @@ class ObjectStorageFormattingWriterFactory( outputStream, formatConfigProvider.objectStorageFormatConfiguration as ParquetFormatConfiguration, - recordDecorator ) - is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator) + is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream) } } } class JsonFormattingWriter( + private val stream: DestinationStream, private val outputStream: OutputStream, - private val recordDecorator: DestinationRecordToAirbyteValueWithMeta ) : ObjectStorageFormattingWriter { override fun accept(record: DestinationRecord) { - outputStream.write(recordDecorator.decorate(record).toJson().serializeToString()) + outputStream.write(record.dataWithAirbyteMeta(stream).toJson().serializeToString()) outputStream.write("\n") } @@ -78,13 +76,12 @@ class JsonFormattingWriter( } class CSVFormattingWriter( - stream: DestinationStream, + private val stream: DestinationStream, outputStream: OutputStream, - private val recordDecorator: DestinationRecordToAirbyteValueWithMeta ) : ObjectStorageFormattingWriter { - private val printer = stream.schemaWithMeta.toCsvPrinterWithHeader(outputStream) + private val printer = stream.schema.withAirbyteMeta().toCsvPrinterWithHeader(outputStream) override fun accept(record: DestinationRecord) { - printer.printRecord(*recordDecorator.decorate(record).toCsvRecord()) + printer.printRecord(*record.dataWithAirbyteMeta(stream).toCsvRecord()) } override fun close() { printer.close() @@ -92,16 +89,15 @@ class CSVFormattingWriter( } class AvroFormattingWriter( - stream: DestinationStream, + private val stream: DestinationStream, outputStream: OutputStream, formatConfig: AvroFormatConfiguration, - private val recordDecorator: DestinationRecordToAirbyteValueWithMeta ) : ObjectStorageFormattingWriter { - private val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor) + private val avroSchema = stream.schema.withAirbyteMeta().toAvroSchema(stream.descriptor) private val writer = outputStream.toAvroWriter(avroSchema, formatConfig.avroCompressionConfiguration) override fun accept(record: DestinationRecord) { - writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema)) + writer.write(record.dataWithAirbyteMeta(stream).toAvroRecord(avroSchema)) } override fun close() { @@ -110,16 +106,15 @@ class AvroFormattingWriter( } class ParquetFormattingWriter( - stream: DestinationStream, + private val stream: DestinationStream, outputStream: OutputStream, formatConfig: ParquetFormatConfiguration, - private val recordDecorator: DestinationRecordToAirbyteValueWithMeta ) : ObjectStorageFormattingWriter { - private val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor) + private val avroSchema = stream.schema.withAirbyteMeta().toAvroSchema(stream.descriptor) private val writer = outputStream.toParquetWriter(avroSchema, formatConfig.parquetWriterConfiguration) override fun accept(record: DestinationRecord) { - writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema)) + writer.write(record.dataWithAirbyteMeta(stream).toAvroRecord(avroSchema)) } override fun close() { diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt index 00210b252097..f683be9efc3e 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt @@ -15,6 +15,7 @@ import io.airbyte.cdk.load.data.avro.toAirbyteValue import io.airbyte.cdk.load.data.avro.toAvroSchema import io.airbyte.cdk.load.data.csv.toAirbyteValue import io.airbyte.cdk.load.data.json.toAirbyteValue +import io.airbyte.cdk.load.data.withAirbyteMeta import io.airbyte.cdk.load.file.GZIPProcessor import io.airbyte.cdk.load.file.NoopProcessor import io.airbyte.cdk.load.file.avro.toAvroReader @@ -76,7 +77,7 @@ class ObjectStorageDataDumper( .map { line -> line .deserializeToNode() - .toAirbyteValue(stream.schemaWithMeta) + .toAirbyteValue(stream.schema.withAirbyteMeta()) .toOutputRecord() } .toList() @@ -84,27 +85,33 @@ class ObjectStorageDataDumper( is CSVFormatConfiguration -> { CSVParser(inputStream.bufferedReader(), CSVFormat.DEFAULT.withHeader()).use { it.records.map { record -> - record.toAirbyteValue(stream.schemaWithMeta).toOutputRecord() + record.toAirbyteValue(stream.schema.withAirbyteMeta()).toOutputRecord() } } } is AvroFormatConfiguration -> { inputStream - .toAvroReader(stream.schemaWithMeta.toAvroSchema(stream.descriptor)) + .toAvroReader(stream.schema.withAirbyteMeta().toAvroSchema(stream.descriptor)) .use { reader -> reader .recordSequence() - .map { it.toAirbyteValue(stream.schemaWithMeta).toOutputRecord() } + .map { + it.toAirbyteValue(stream.schema.withAirbyteMeta()).toOutputRecord() + } .toList() } } is ParquetFormatConfiguration -> { inputStream - .toParquetReader(stream.schemaWithMeta.toAvroSchema(stream.descriptor)) + .toParquetReader( + stream.schema.withAirbyteMeta().toAvroSchema(stream.descriptor) + ) .use { reader -> reader .recordSequence() - .map { it.toAirbyteValue(stream.schemaWithMeta).toOutputRecord() } + .map { + it.toAirbyteValue(stream.schema.withAirbyteMeta()).toOutputRecord() + } .toList() } }