Skip to content

Commit

Permalink
handle pk value from saved state (#48423)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich authored Nov 8, 2024
1 parent 7bb78d8 commit 63cd395
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 6 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.9.0-rc.2
dockerImageTag: 3.9.0-rc.3
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,9 @@ class MysqlJdbcPartitionFactory(
} else {
val sv: MysqlJdbcStreamStateValue =
Jsons.treeToValue(opaqueStateValue, MysqlJdbcStreamStateValue::class.java)
println("sv: $sv")

if (stream.configuredSyncMode == ConfiguredSyncMode.FULL_REFRESH) {
val upperBound = findPkUpperBound(stream, pkChosenFromCatalog)
println("pkval: ${sv.pkValue}, upperBound: ${upperBound.asText()}")
if (sv.pkValue == upperBound.asText()) {
return null
}
Expand All @@ -249,7 +247,9 @@ class MysqlJdbcPartitionFactory(
if (sv.stateType != "cursor_based") {
// Loading value from catalog. Note there could be unexpected behaviors if user
// updates their schema but did not reset their state.
val pkLowerBound: JsonNode = Jsons.valueToTree(sv.pkValue)
val pkField = pkChosenFromCatalog.first()
val pkLowerBound: JsonNode = stateValueToJsonNode(pkField, sv.pkValue)

val cursorChosenFromCatalog: Field =
stream.configuredCursor as? Field ?: throw ConfigErrorException("no cursor")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ data class MysqlJdbcStreamStateValue(
@JsonProperty("stream_namespace") val streamNamespace: String = "",
@JsonProperty("cursor_record_count") val cursorRecordCount: Int = 0,
@JsonProperty("pk_name") val pkName: String? = null,
@JsonProperty("pk_value") val pkValue: String? = null,
@JsonProperty("pk_val") val pkValue: String? = null,
@JsonProperty("incremental_state") val incrementalState: JsonNode? = null,
) {
companion object {
Expand Down
2 changes: 1 addition & 1 deletion docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.9.0-rc.2 | 2024-11-05 | [48369](https://github.com/airbytehq/airbyte/pull/48369) | Progressive rollout test. |
| 3.9.0-rc.3 | 2024-11-05 | [48369](https://github.com/airbytehq/airbyte/pull/48369) | Progressive rollout test. |
| 3.7.3 | 2024-09-17 | [45639](https://github.com/airbytehq/airbyte/pull/45639) | Adopt latest CDK to use the latest apache sshd mina to handle tcpkeepalive requests. |
| 3.7.2 | 2024-09-05 | [45181](https://github.com/airbytehq/airbyte/pull/45181) | Fix incorrect categorizing resumable/nonresumable full refresh streams. |
| 3.7.1 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
Expand Down

0 comments on commit 63cd395

Please sign in to comment.