Skip to content

Commit

Permalink
Bulk Load CDK: Preliminary Refactor before adding mappers (#48350)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Nov 6, 2024
1 parent 6294144 commit e6d3cde
Show file tree
Hide file tree
Showing 24 changed files with 320 additions and 423 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package io.airbyte.cdk.load.check
import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.NullType
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema

/**
* A check operation that is run before the destination is used.
Expand All @@ -25,7 +25,7 @@ interface DestinationChecker<C : DestinationConfiguration> {
DestinationStream(
descriptor = DestinationStream.Descriptor("testing", "test"),
importType = Append,
schema = NullType,
schema = ObjectTypeWithoutSchema,
generationId = 1,
minimumGenerationId = 0,
syncId = 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,8 @@
package io.airbyte.cdk.load.command

import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.json.AirbyteTypeToJsonSchema
import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
Expand Down Expand Up @@ -51,59 +45,6 @@ data class DestinationStream(
* what actually exists, as many destinations have legacy data from before this schema was
* adopted.
*/
val schemaWithMeta: ObjectType
get() =
ObjectType(
linkedMapOf(
DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to
FieldType(StringType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_META to
FieldType(
nullable = false,
type =
ObjectType(
linkedMapOf(
"sync_id" to FieldType(IntegerType, nullable = false),
"changes" to
FieldType(
nullable = false,
type =
ArrayType(
FieldType(
nullable = false,
type =
ObjectType(
linkedMapOf(
"field" to
FieldType(
StringType,
nullable = false
),
"change" to
FieldType(
StringType,
nullable = false
),
"reason" to
FieldType(
StringType,
nullable = false
),
)
)
)
)
)
)
)
),
DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_DATA to FieldType(schema, nullable = false),
)
)

/**
* This is not fully round-trippable. Destinations don't care about most of the stuff in an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

package io.airbyte.cdk.load.data

interface AirbyteSchemaIdentityMapper {
fun map(schema: AirbyteType): AirbyteType =
interface AirbyteSchemaMapper {
fun map(schema: AirbyteType): AirbyteType
}

interface AirbyteSchemaIdentityMapper : AirbyteSchemaMapper {
override fun map(schema: AirbyteType): AirbyteType =
when (schema) {
is NullType -> mapNull(schema)
is StringType -> mapString(schema)
is BooleanType -> mapBoolean(schema)
is IntegerType -> mapInteger(schema)
Expand All @@ -26,7 +29,6 @@ interface AirbyteSchemaIdentityMapper {
is UnknownType -> mapUnknown(schema)
}

fun mapNull(schema: NullType): AirbyteType = schema
fun mapString(schema: StringType): AirbyteType = schema
fun mapBoolean(schema: BooleanType): AirbyteType = schema
fun mapInteger(schema: IntegerType): AirbyteType = schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ package io.airbyte.cdk.load.data

sealed interface AirbyteType

data object NullType : AirbyteType

data object StringType : AirbyteType

data object BooleanType : AirbyteType
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.message.DestinationRecord

class AirbyteTypeToAirbyteTypeWithMeta {
fun convert(schema: AirbyteType): ObjectType =
ObjectType(
linkedMapOf(
DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to
FieldType(StringType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_META to
FieldType(
nullable = false,
type =
ObjectType(
linkedMapOf(
"sync_id" to FieldType(IntegerType, nullable = false),
"changes" to
FieldType(
nullable = false,
type =
ArrayType(
FieldType(
nullable = false,
type =
ObjectType(
linkedMapOf(
"field" to
FieldType(
StringType,
nullable = false
),
"change" to
FieldType(
StringType,
nullable = false
),
"reason" to
FieldType(
StringType,
nullable = false
),
)
)
)
)
)
)
)
),
DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_DATA to FieldType(schema, nullable = false),
)
)
}

fun AirbyteType.withAirbyteMeta(): ObjectType = AirbyteTypeToAirbyteTypeWithMeta().convert(this)
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ open class AirbyteValueIdentityMapper(
mapTimestampWithTimezone(value as TimestampValue, path)
is TimestampTypeWithoutTimezone ->
mapTimestampWithoutTimezone(value as TimestampValue, path)
is NullType -> mapNull(path)
is UnknownType -> {
collectFailure(path)
mapNull(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,24 @@

package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.DestinationRecord.Meta
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.util.*

@Singleton
@Secondary
class DestinationRecordToAirbyteValueWithMeta(private val catalog: DestinationCatalog) {
fun decorate(record: DestinationRecord): ObjectValue {
val streamActual = catalog.getStream(record.stream.name, record.stream.namespace)
class DestinationRecordToAirbyteValueWithMeta(val stream: DestinationStream) {
fun convert(data: AirbyteValue, emittedAtMs: Long, meta: DestinationRecord.Meta?): ObjectValue {
return ObjectValue(
linkedMapOf(
Meta.COLUMN_NAME_AB_RAW_ID to StringValue(UUID.randomUUID().toString()),
Meta.COLUMN_NAME_AB_EXTRACTED_AT to IntegerValue(record.emittedAtMs),
Meta.COLUMN_NAME_AB_EXTRACTED_AT to IntegerValue(emittedAtMs),
Meta.COLUMN_NAME_AB_META to
ObjectValue(
linkedMapOf(
"sync_id" to IntegerValue(streamActual.syncId),
"sync_id" to IntegerValue(stream.syncId),
"changes" to
ArrayValue(
record.meta?.changes?.map {
meta?.changes?.map {
ObjectValue(
linkedMapOf(
"field" to StringValue(it.field),
Expand All @@ -39,9 +34,12 @@ class DestinationRecordToAirbyteValueWithMeta(private val catalog: DestinationCa
)
)
),
Meta.COLUMN_NAME_AB_GENERATION_ID to IntegerValue(streamActual.generationId),
Meta.COLUMN_NAME_DATA to record.data
Meta.COLUMN_NAME_AB_GENERATION_ID to IntegerValue(stream.generationId),
Meta.COLUMN_NAME_DATA to data
)
)
}
}

fun DestinationRecord.dataWithAirbyteMeta(stream: DestinationStream) =
DestinationRecordToAirbyteValueWithMeta(stream).convert(data, emittedAtMs, meta)

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,24 @@ import io.airbyte.cdk.load.message.DestinationRecord

class UnionTypeToDisjointRecord : AirbyteSchemaIdentityMapper {
override fun mapUnion(schema: UnionType): AirbyteType {
val (nullOptions, nonNullOptions) = schema.options.partition { it is NullType }
if (nonNullOptions.size < 2) {
if (schema.options.size < 2) {
return schema
}
/* Create a schema of { "type": "string", "<typename(option1)>": <type(option1)>, etc... } */
val properties = linkedMapOf("type" to FieldType(StringType, nullable = false))
nonNullOptions.forEach {
schema.options.forEach {
val name = typeName(it)
if (name in properties) {
throw IllegalArgumentException("Union of types with same name: $name")
}
properties[typeName(it)] = FieldType(it, nullable = true)
}
val obj = ObjectType(properties)
if (nullOptions.isEmpty()) {
return obj
}
return UnionType(nullOptions + obj)
return ObjectType(properties)
}

companion object {
fun typeName(type: AirbyteType): String =
when (type) {
is NullType -> "null"
is StringType -> "string"
is BooleanType -> "boolean"
is IntegerType -> "integer"
Expand All @@ -58,8 +52,7 @@ class UnionValueToDisjointRecord(meta: DestinationRecord.Meta) : AirbyteValueIde
schema: UnionType,
path: List<String>
): AirbyteValue {
val nNonNullOptions = schema.options.filter { it !is NullType }.size
if (nNonNullOptions < 2) {
if (schema.options.size < 2) {
return value
}

Expand All @@ -82,7 +75,6 @@ class UnionValueToDisjointRecord(meta: DestinationRecord.Meta) : AirbyteValueIde
is BooleanType -> value is BooleanValue
is DateType -> value is DateValue
is IntegerType -> value is IntegerValue
is NullType -> value is NullValue
is NumberType -> value is NumberValue
is ObjectType,
is ObjectTypeWithoutSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class AirbyteTypeToJsonSchema {

fun convert(airbyteType: AirbyteType): JsonNode {
return when (airbyteType) {
is NullType -> ofType("null")
is StringType -> ofType("string")
is BooleanType -> ofType("boolean")
is IntegerType -> ofType("integer")
Expand All @@ -25,13 +24,13 @@ class AirbyteTypeToJsonSchema {
JsonNodeFactory.instance
.objectNode()
.put("type", "array")
.set("items", fromFieldType(airbyteType.items))
.set("items", convert(airbyteType.items.type))
is ArrayTypeWithoutSchema -> ofType("array")
is ObjectType -> {
val objNode = ofType("object")
val properties = objNode.putObject("properties")
airbyteType.properties.forEach { (name, field) ->
properties.replace(name, fromFieldType(field))
properties.replace(name, convert(field.type))
}
objNode
}
Expand Down Expand Up @@ -68,14 +67,4 @@ class AirbyteTypeToJsonSchema {
is UnknownType -> JsonNodeFactory.instance.objectNode()
}
}

private fun fromFieldType(field: FieldType): JsonNode {
if (field.nullable) {
if (field.type is UnionType) {
return convert(UnionType(options = field.type.options + NullType))
}
return convert(UnionType(options = listOf(field.type, NullType)))
}
return convert(field.type)
}
}
Loading

0 comments on commit e6d3cde

Please sign in to comment.