Skip to content

Commit

Permalink
Bulk load CDK: Fix longs in additionalProperties (#48429)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Nov 8, 2024
1 parent 63cd395 commit 57f076f
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ sealed interface CheckpointMessage : DestinationMessage {

val sourceStats: Stats?
val destinationStats: Stats?
val stateMessageId: Long?
val additionalProperties: Map<String, Any>

fun withDestinationStats(stats: Stats): CheckpointMessage

Expand All @@ -279,17 +279,15 @@ sealed interface CheckpointMessage : DestinationMessage {
message.destinationStats =
AirbyteStateStats().withRecordCount(destinationStats!!.recordCount.toDouble())
}
if (stateMessageId != null) {
message.withAdditionalProperty("id", stateMessageId)
}
additionalProperties.forEach { (key, value) -> message.withAdditionalProperty(key, value) }
}
}

data class StreamCheckpoint(
val checkpoint: Checkpoint,
override val sourceStats: Stats?,
override val destinationStats: Stats? = null,
override val stateMessageId: Long? = null,
override val additionalProperties: Map<String, Any> = emptyMap(),
) : CheckpointMessage {
/** Convenience constructor, intended for use in tests. */
constructor(
Expand All @@ -298,15 +296,14 @@ data class StreamCheckpoint(
blob: String,
sourceRecordCount: Long,
destinationRecordCount: Long? = null,
stateMessageId: Long? = null,
) : this(
Checkpoint(
DestinationStream.Descriptor(streamNamespace, streamName),
state = Jsons.deserialize(blob)
),
Stats(sourceRecordCount),
destinationRecordCount?.let { Stats(it) },
stateMessageId,
emptyMap(),
)

override fun withDestinationStats(stats: Stats) = copy(destinationStats = stats)
Expand All @@ -326,17 +323,16 @@ data class GlobalCheckpoint(
override val sourceStats: Stats?,
override val destinationStats: Stats? = null,
val checkpoints: List<Checkpoint> = emptyList(),
override val stateMessageId: Long? = null,
override val additionalProperties: Map<String, Any>,
) : CheckpointMessage {
/** Convenience constructor, primarily intended for use in tests. */
constructor(
blob: String,
sourceRecordCount: Long,
stateMessageId: Long? = null,
) : this(
state = Jsons.deserialize(blob),
Stats(sourceRecordCount),
stateMessageId = stateMessageId,
additionalProperties = emptyMap(),
)
override fun withDestinationStats(stats: Stats) = copy(destinationStats = stats)

Expand Down Expand Up @@ -371,6 +367,21 @@ class DestinationMessageFactory(
@Value("airbyte.file-transfer.enabled") private val fileTransferEnabled: Boolean,
) {
fun fromAirbyteMessage(message: AirbyteMessage, serialized: String): DestinationMessage {
fun toLong(value: Any?, name: String): Long? {
return value?.let {
when (it) {
// you can't cast java.lang.Integer -> java.lang.Long
// so instead we have to do this manual pattern match
is Int -> it.toLong()
is Long -> it
else ->
throw IllegalArgumentException(
"Unexpected value for $name: $it (${it::class.qualifiedName})"
)
}
}
}

return when (message.type) {
AirbyteMessage.Type.RECORD -> {
val stream =
Expand All @@ -387,11 +398,19 @@ class DestinationMessageFactory(
DestinationFile.AirbyteRecordMessageFile(
fileUrl =
message.record.additionalProperties["file_url"] as String?,
bytes = message.record.additionalProperties["bytes"] as Long?,
bytes =
toLong(
message.record.additionalProperties["bytes"],
"message.record.bytes"
),
fileRelativePath =
message.record.additionalProperties["file_relative_path"]
as String?,
modified = message.record.additionalProperties["modified"] as Long?,
modified =
toLong(
message.record.additionalProperties["modified"],
"message.record.modified"
),
sourceFileUrl =
message.record.additionalProperties["source_file_url"]
as String?
Expand Down Expand Up @@ -461,7 +480,6 @@ class DestinationMessageFactory(
}
}
AirbyteMessage.Type.STATE -> {
val stateMessageId = message.state.additionalProperties["id"] as Long?
when (message.state.type) {
AirbyteStateMessage.AirbyteStateType.STREAM ->
StreamCheckpoint(
Expand All @@ -470,7 +488,7 @@ class DestinationMessageFactory(
message.state.sourceStats?.recordCount?.let {
Stats(recordCount = it.toLong())
},
stateMessageId = stateMessageId,
additionalProperties = message.state.additionalProperties,
)
AirbyteStateMessage.AirbyteStateType.GLOBAL ->
GlobalCheckpoint(
Expand All @@ -483,7 +501,7 @@ class DestinationMessageFactory(
message.state.global.streamStates.map {
fromAirbyteStreamState(it)
},
stateMessageId = stateMessageId,
additionalProperties = message.state.additionalProperties,
)
else -> // TODO: Do we still need to handle LEGACY?
Undefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,35 @@ class DestinationMessageTest {
isFileTransferEnabled
)

private fun convert(
factory: DestinationMessageFactory,
message: AirbyteMessage
): DestinationMessage {
val serialized = Jsons.serialize(message)
return factory.fromAirbyteMessage(
// We have to set some stuff in additionalProperties, so force the protocol model back
// to a serialized representation and back.
// This avoids issues with e.g. `additionalProperties.put("foo", 12L)`:
// working directly with that object, `additionalProperties["foo"]` returns `Long?`,
// whereas converting to JSON yields `{"foo": 12}`, which then deserializes back out
// as `Int?`.
// Fortunately, the protocol models are (by definition) round-trippable through JSON.
Jsons.deserialize(serialized, AirbyteMessage::class.java),
serialized,
)
}

@ParameterizedTest
@MethodSource("roundTrippableMessages")
fun testRoundTripRecord(message: AirbyteMessage) {
val roundTripped =
factory(false).fromAirbyteMessage(message, Jsons.serialize(message)).asProtocolMessage()
val roundTripped = convert(factory(false), message).asProtocolMessage()
Assertions.assertEquals(message, roundTripped)
}

@ParameterizedTest
@MethodSource("roundTrippableFileMessages")
fun testRoundTripFile(message: AirbyteMessage) {
val roundTripped =
factory(true).fromAirbyteMessage(message, Jsons.serialize(message)).asProtocolMessage()
val roundTripped = convert(factory(true), message).asProtocolMessage()
Assertions.assertEquals(message, roundTripped)
}

Expand All @@ -76,18 +92,23 @@ class DestinationMessageTest {
)
// Note: only source stats, no destination stats
.withSourceStats(AirbyteStateStats().withRecordCount(2.0))
.withAdditionalProperty("id", 1234L)
.withAdditionalProperty("id", 1234)
)

val parsedMessage =
factory(false).fromAirbyteMessage(inputMessage, Jsons.serialize(inputMessage))
as StreamCheckpoint
val parsedMessage = convert(factory(false), inputMessage) as StreamCheckpoint

Assertions.assertEquals(
inputMessage.also {
it.state.destinationStats = AirbyteStateStats().withRecordCount(3.0)
},
parsedMessage.withDestinationStats(CheckpointMessage.Stats(3)).asProtocolMessage(),
// we represent the state message ID as a long, but jackson sees that 1234 can be Int,
// and Int(1234) != Long(1234). (and additionalProperties is just a Map<String, Any?>)
// So we just compare the serialized protocol messages.
Jsons.serialize(
inputMessage.also {
it.state.destinationStats = AirbyteStateStats().withRecordCount(3.0)
}
),
Jsons.serialize(
parsedMessage.withDestinationStats(CheckpointMessage.Stats(3)).asProtocolMessage()
),
)
}

Expand All @@ -112,21 +133,20 @@ class DestinationMessageTest {
)
// Note: only source stats, no destination stats
.withSourceStats(AirbyteStateStats().withRecordCount(2.0))
.withAdditionalProperty("id", 1234L)
.withAdditionalProperty("id", 1234)
)

val parsedMessage =
factory(false)
.fromAirbyteMessage(
inputMessage,
Jsons.serialize(inputMessage),
) as GlobalCheckpoint
val parsedMessage = convert(factory(false), inputMessage) as GlobalCheckpoint

Assertions.assertEquals(
inputMessage.also {
it.state.destinationStats = AirbyteStateStats().withRecordCount(3.0)
},
parsedMessage.withDestinationStats(CheckpointMessage.Stats(3)).asProtocolMessage(),
Jsons.serialize(
inputMessage.also {
it.state.destinationStats = AirbyteStateStats().withRecordCount(3.0)
}
),
Jsons.serialize(
parsedMessage.withDestinationStats(CheckpointMessage.Stats(3)).asProtocolMessage()
),
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ class InputConsumerTaskTest {
return GlobalCheckpoint(
state = JsonNodeFactory.instance.objectNode(),
sourceStats = CheckpointMessage.Stats(recordCount),
checkpoints = emptyList()
checkpoints = emptyList(),
additionalProperties = emptyMap(),
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: a7bcc9d8-13b3-4e49-b80d-d020b90045e3
dockerImageTag: 0.7.9
dockerImageTag: 0.7.10
dockerRepository: airbyte/destination-dev-null
githubIssueLabel: destination-dev-null
icon: airbyte.svg
license: MIT
name: End-to-End Testing (/dev/null)
registryOverrides:
cloud:
dockerImageTag: 0.7.7
enabled: true
oss:
dockerImageTag: 0.7.7
enabled: true
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/destinations/dev-null
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/dev-null.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ The OSS and Cloud variants have the same version number starting from version `0

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------|
| 0.7.10 | 2024-11-08 | [48429](https://github.com/airbytehq/airbyte/pull/48429) | Bugfix: correctly handle state ID field |
| 0.7.9 | 2024-11-07 | [48417](https://github.com/airbytehq/airbyte/pull/48417) | Only pass through the state ID field, not all additional properties |
| 0.7.8 | 2024-11-07 | [48416](https://github.com/airbytehq/airbyte/pull/48416) | Bugfix: global state correclty sends additional properties |
| 0.7.7 | 2024-10-17 | [46692](https://github.com/airbytehq/airbyte/pull/46692) | Internal code changes |
Expand Down

0 comments on commit 57f076f

Please sign in to comment.