Skip to content

Commit

Permalink
handle pk value from saved state
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich committed Nov 8, 2024
1 parent c589cb5 commit 1a1f8f6
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,9 @@ class MysqlJdbcPartitionFactory(
} else {
val sv: MysqlJdbcStreamStateValue =
Jsons.treeToValue(opaqueStateValue, MysqlJdbcStreamStateValue::class.java)
println("sv: $sv")

if (stream.configuredSyncMode == ConfiguredSyncMode.FULL_REFRESH) {
val upperBound = findPkUpperBound(stream, pkChosenFromCatalog)
println("pkval: ${sv.pkValue}, upperBound: ${upperBound.asText()}")
if (sv.pkValue == upperBound.asText()) {
return null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class MysqlSourceOperations :
is And -> conj.flatMap { it.bindings() }
is Or -> disj.flatMap { it.bindings() }
is WhereClauseLeafNode -> {
val type = column.type as LosslessJdbcFieldType<*, *>
val type = /*column.type as LosslessJdbcFieldType<*, *>*/StringFieldType
listOf(SelectQuery.Binding(bindingValue, type))
}
}
Expand Down

0 comments on commit 1a1f8f6

Please sign in to comment.