Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Load CDK: Root-Level Flattening & S3V2 Usage #48377

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.message.DestinationRecord

class AirbyteTypeToAirbyteTypeWithMeta {
fun convert(schema: AirbyteType): ObjectType =
ObjectType(
class AirbyteTypeToAirbyteTypeWithMeta(private val flatten: Boolean) {
fun convert(schema: AirbyteType): ObjectType {
val properties =
linkedMapOf(
DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to
FieldType(StringType, nullable = false),
Expand Down Expand Up @@ -55,10 +55,17 @@ class AirbyteTypeToAirbyteTypeWithMeta {
)
),
DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_DATA to FieldType(schema, nullable = false),
FieldType(IntegerType, nullable = false)
)
)
if (flatten) {
(schema as ObjectType).properties.forEach { (name, field) -> properties[name] = field }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should throw if a property clashes with an airbyte field? (honestly not sure, I've never figured out what the expected behavior is for people chaining source -> airbyte -> destination -> airbyte -> destination)

} else {
properties[DestinationRecord.Meta.COLUMN_NAME_DATA] =
FieldType(schema, nullable = false)
}
return ObjectType(properties)
}
}

fun AirbyteType.withAirbyteMeta(): ObjectType = AirbyteTypeToAirbyteTypeWithMeta().convert(this)
fun AirbyteType.withAirbyteMeta(flatten: Boolean = false): ObjectType =
AirbyteTypeToAirbyteTypeWithMeta(flatten).convert(this)
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ interface AirbyteValueMapper {
/** An optimized identity mapper that just passes through. */
class AirbyteValueNoopMapper : AirbyteValueMapper {
override val collectedChanges: List<DestinationRecord.Change> = emptyList()

override fun map(
value: AirbyteValue,
schema: AirbyteType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.DestinationRecord.Meta
import java.util.*

class DestinationRecordToAirbyteValueWithMeta(val stream: DestinationStream) {
class DestinationRecordToAirbyteValueWithMeta(
val stream: DestinationStream,
private val flatten: Boolean
) {
fun convert(data: AirbyteValue, emittedAtMs: Long, meta: Meta?): ObjectValue {
return ObjectValue(
val properties =
linkedMapOf(
Meta.COLUMN_NAME_AB_RAW_ID to StringValue(UUID.randomUUID().toString()),
Meta.COLUMN_NAME_AB_EXTRACTED_AT to IntegerValue(emittedAtMs),
Expand All @@ -35,16 +38,23 @@ class DestinationRecordToAirbyteValueWithMeta(val stream: DestinationStream) {
)
),
Meta.COLUMN_NAME_AB_GENERATION_ID to IntegerValue(stream.generationId),
Meta.COLUMN_NAME_DATA to data
)
)
if (flatten) {
properties.putAll((data as ObjectValue).values)
} else {
properties[Meta.COLUMN_NAME_DATA] = data
}
return ObjectValue(properties)
}
}

fun Pair<AirbyteValue, List<DestinationRecord.Change>>.withAirbyteMeta(
stream: DestinationStream,
emittedAtMs: Long
) = DestinationRecordToAirbyteValueWithMeta(stream).convert(first, emittedAtMs, Meta(second))
emittedAtMs: Long,
flatten: Boolean = false
) =
DestinationRecordToAirbyteValueWithMeta(stream, flatten)
.convert(first, emittedAtMs, Meta(second))

fun DestinationRecord.dataWithAirbyteMeta(stream: DestinationStream) =
DestinationRecordToAirbyteValueWithMeta(stream).convert(data, emittedAtMs, meta)
fun DestinationRecord.dataWithAirbyteMeta(stream: DestinationStream, flatten: Boolean = false) =
DestinationRecordToAirbyteValueWithMeta(stream, flatten).convert(data, emittedAtMs, meta)
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ data class DestinationRecord(
const val COLUMN_NAME_AB_META: String = "_airbyte_meta"
const val COLUMN_NAME_AB_GENERATION_ID: String = "_airbyte_generation_id"
const val COLUMN_NAME_DATA: String = "_airbyte_data"
val COLUMN_NAMES =
setOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_META,
COLUMN_NAME_AB_GENERATION_ID,
)
}

fun asProtocolObject(): AirbyteRecordMessageMeta =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.message.DestinationRecord
import java.util.LinkedHashMap
import org.junit.jupiter.api.Assertions

class AirbyteTypeToAirbyteTypeWithMetaTest {
private val expectedMeta =
linkedMapOf(
DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to FieldType(StringType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_META to
FieldType(
ObjectType(
linkedMapOf(
"sync_id" to FieldType(IntegerType, nullable = false),
"changes" to
FieldType(
ArrayType(
FieldType(
ObjectType(
linkedMapOf(
"field" to
FieldType(StringType, nullable = false),
"change" to
FieldType(StringType, nullable = false),
"reason" to
FieldType(StringType, nullable = false)
)
),
nullable = false
)
),
nullable = false
)
)
),
nullable = false
),
DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID to
FieldType(IntegerType, nullable = false)
)

fun testWithoutFlattening() {
val schema =
ObjectType(
linkedMapOf(
"name" to FieldType(StringType, nullable = false),
"age" to FieldType(IntegerType, nullable = false),
"is_cool" to FieldType(BooleanType, nullable = false)
)
)
val withMeta = schema.withAirbyteMeta(flatten = false)
val expected = LinkedHashMap(expectedMeta)
expected[DestinationRecord.Meta.COLUMN_NAME_DATA] = FieldType(schema, nullable = false)
Assertions.assertEquals(expected, withMeta)
}

fun testWithFlattening() {
val schema =
ObjectType(
linkedMapOf(
"name" to FieldType(StringType, nullable = false),
"age" to FieldType(IntegerType, nullable = false),
"is_cool" to FieldType(BooleanType, nullable = false)
)
)
val withMeta = schema.withAirbyteMeta(flatten = true)
val expected = LinkedHashMap(expectedMeta)
schema.properties.forEach { (name, field) -> expected[name] = field }
Assertions.assertEquals(expected, withMeta)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.command.MockDestinationCatalogFactory
import io.airbyte.cdk.load.message.DestinationRecord
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

class DestinationRecordToAirbyteValueWithMetaTest {
val stream = MockDestinationCatalogFactory.stream1
val emittedAtMs = 123456L
val syncId = stream.syncId
val generationId = stream.generationId
val expectedMeta =
linkedMapOf(
// Don't do raw_id, we'll evict it and validate that it's a uuid
DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT to IntegerValue(emittedAtMs),
DestinationRecord.Meta.COLUMN_NAME_AB_META to
ObjectValue(
linkedMapOf(
"sync_id" to IntegerValue(syncId),
"changes" to ArrayValue(emptyList())
)
),
DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID to IntegerValue(generationId)
)

@Test
fun testWithoutFlattening() {
val data =
ObjectValue(
linkedMapOf(
"name" to StringValue("John"),
"age" to IntegerValue(30),
"is_cool" to BooleanValue(true)
)
)
val expected = LinkedHashMap(expectedMeta)
expected[DestinationRecord.Meta.COLUMN_NAME_DATA] = data
val mockRecord =
DestinationRecord(
stream.descriptor,
data,
emittedAtMs,
DestinationRecord.Meta(),
"dummy"
)
val withMeta = mockRecord.dataWithAirbyteMeta(stream, flatten = false)
val uuid =
withMeta.values.remove(DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID) as StringValue
Assertions.assertTrue(
uuid.value.matches(
Regex("[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}")
)
)
Assertions.assertEquals(expected, withMeta.values)
}

@Test
fun testWithFlattening() {
val data =
ObjectValue(
linkedMapOf(
"name" to StringValue("John"),
"age" to IntegerValue(30),
"is_cool" to BooleanValue(true)
)
)
val expected = LinkedHashMap(expectedMeta)
data.values.forEach { (name, value) -> expected[name] = value }
val mockRecord =
DestinationRecord(
stream.descriptor,
data,
emittedAtMs,
DestinationRecord.Meta(),
"dummy"
)
val withMeta = mockRecord.dataWithAirbyteMeta(stream, flatten = true)
withMeta.values.remove(DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID)
Assertions.assertEquals(expected, withMeta.values)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import java.time.Instant
import java.util.*
import kotlin.collections.LinkedHashMap

class AirbyteValueWithMetaToOutputRecord {
fun convert(value: ObjectValue): OutputRecord {
Expand Down Expand Up @@ -60,6 +61,19 @@ class AirbyteValueWithMetaToOutputRecord {
}
}

fun AirbyteValue.maybeUnflatten(wasFlattened: Boolean): ObjectValue {
this as ObjectValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this? (unless it's for the sake of doing a smart cast, in which case add a comment? or maybe define this as fun ObjectValue.maybeUnflatten)

if (!wasFlattened) {
return this
}
val (meta, data) =
this.values.toList().partition { DestinationRecord.Meta.COLUMN_NAMES.contains(it.first) }
val properties = LinkedHashMap(meta.toMap())
val dataObject = ObjectValue(LinkedHashMap(data.toMap()))
properties[DestinationRecord.Meta.COLUMN_NAME_DATA] = dataObject
return ObjectValue(properties)
}

fun AirbyteValue.toOutputRecord(): OutputRecord {
return AirbyteValueWithMetaToOutputRecord().convert(this as ObjectValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.avro.util.Utf8

class AvroRecordToAirbyteValue {
fun convert(avroValue: Any?, schema: AirbyteType, depth: Int): AirbyteValue {
fun convert(avroValue: Any?, schema: AirbyteType, top: Boolean = false): AirbyteValue {
if (avroValue == null) {
return NullValue
}
Expand All @@ -47,16 +47,15 @@ class AvroRecordToAirbyteValue {
val properties = LinkedHashMap<String, AirbyteValue>()
schema.properties.forEach { (name, field) ->
val value = (avroValue as GenericRecord).get(name)
if ((value != null) || depth < 2) {
properties[name] = convert(value, field.type, depth + 1)
if ((value != null) || top) {
properties[name] = convert(value, field.type)
}
}
return ObjectValue(properties)
}
is ArrayType -> {
val items = schema.items
val values =
(avroValue as GenericArray<*>).map { convert(it, items.type, depth + 1) }
val values = (avroValue as GenericArray<*>).map { convert(it, items.type) }
return ArrayValue(values)
}
is ArrayTypeWithoutSchema ->
Expand Down Expand Up @@ -89,16 +88,16 @@ class AvroRecordToAirbyteValue {
is TimestampTypeWithoutTimezone,
is TimestampTypeWithTimezone ->
return TimestampValue(Instant.ofEpochMilli(avroValue as Long).toString())
is UnionType -> return tryConvertUnion(avroValue, schema, depth)
is UnionType -> return tryConvertUnion(avroValue, schema)
is UnknownType -> throw UnsupportedOperationException("UnknownType is not supported")
else -> throw IllegalArgumentException("Unsupported schema type: $schema")
}
}

private fun tryConvertUnion(avroValue: Any?, schema: UnionType, depth: Int): AirbyteValue {
private fun tryConvertUnion(avroValue: Any?, schema: UnionType): AirbyteValue {
for (type in schema.options) {
try {
return convert(avroValue, type, depth + 1)
return convert(avroValue, type)
} catch (e: Exception) {
continue
}
Expand All @@ -108,5 +107,5 @@ class AvroRecordToAirbyteValue {
}

fun GenericRecord.toAirbyteValue(schema: AirbyteType): AirbyteValue {
return AvroRecordToAirbyteValue().convert(this, schema, 0)
return AvroRecordToAirbyteValue().convert(this, schema, true)
}
Loading
Loading