diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 7847eb43c636..aad9844ba6a4 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.9.0-rc.2 + dockerImageTag: 3.9.0-rc.3 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt index bb6dca28d813..611c8d1a2df2 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt @@ -226,11 +226,9 @@ class MysqlJdbcPartitionFactory( } else { val sv: MysqlJdbcStreamStateValue = Jsons.treeToValue(opaqueStateValue, MysqlJdbcStreamStateValue::class.java) - println("sv: $sv") if (stream.configuredSyncMode == ConfiguredSyncMode.FULL_REFRESH) { val upperBound = findPkUpperBound(stream, pkChosenFromCatalog) - println("pkval: ${sv.pkValue}, upperBound: ${upperBound.asText()}") if (sv.pkValue == upperBound.asText()) { return null } @@ -249,7 +247,9 @@ class MysqlJdbcPartitionFactory( if (sv.stateType != "cursor_based") { // Loading value from catalog. Note there could be unexpected behaviors if user // updates their schema but did not reset their state. - val pkLowerBound: JsonNode = Jsons.valueToTree(sv.pkValue) + val pkField = pkChosenFromCatalog.first() + val pkLowerBound: JsonNode = stateValueToJsonNode(pkField, sv.pkValue) + val cursorChosenFromCatalog: Field = stream.configuredCursor as? Field ?: throw ConfigErrorException("no cursor") diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcStreamStateValue.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcStreamStateValue.kt index a2de16471ef0..6d2affe10375 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcStreamStateValue.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcStreamStateValue.kt @@ -20,7 +20,7 @@ data class MysqlJdbcStreamStateValue( @JsonProperty("stream_namespace") val streamNamespace: String = "", @JsonProperty("cursor_record_count") val cursorRecordCount: Int = 0, @JsonProperty("pk_name") val pkName: String? = null, - @JsonProperty("pk_value") val pkValue: String? = null, + @JsonProperty("pk_val") val pkValue: String? = null, @JsonProperty("incremental_state") val incrementalState: JsonNode? = null, ) { companion object { diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 0aa03e39795a..4bb563bf0ba7 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -226,7 +226,7 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:-----------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.9.0-rc.2 | 2024-11-05 | [48369](https://github.com/airbytehq/airbyte/pull/48369) | Progressive rollout test. | +| 3.9.0-rc.3 | 2024-11-05 | [48369](https://github.com/airbytehq/airbyte/pull/48369) | Progressive rollout test. | | 3.7.3 | 2024-09-17 | [45639](https://github.com/airbytehq/airbyte/pull/45639) | Adopt latest CDK to use the latest apache sshd mina to handle tcpkeepalive requests. | | 3.7.2 | 2024-09-05 | [45181](https://github.com/airbytehq/airbyte/pull/45181) | Fix incorrect categorizing resumable/nonresumable full refresh streams. | | 3.7.1 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |