diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/InitialPkLoadEnabledCdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/InitialPkLoadEnabledCdcMysqlSourceTest.java index e4bfaba0372b..12573308c1c1 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/InitialPkLoadEnabledCdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/InitialPkLoadEnabledCdcMysqlSourceTest.java @@ -172,6 +172,40 @@ protected void assertStateMessagesForNewTableSnapshotTest(final List> primaryKeys = configuredCatalog.getStreams().get(0).getStream().getSourceDefinedPrimaryKey(); + primaryKeys.add(List.of("make_id")); + + final AutoCloseableIterator read1 = getSource() + .read(getConfig(), configuredCatalog, null); + + final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); + + final Set recordMessages1 = extractRecordMessages(actualRecords1); + final List stateMessages1 = extractStateMessages(actualRecords1); + assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordMessages1); + assertExpectedStateMessages(stateMessages1); + + // Re-run the sync with state associated with record w/ id = 15 (second to last record). + // We expect to read 2 records, since in the case of a composite PK we issue a >= query. + // We also expect 3 state records. One associated with the pk state, one to signify end of initial load, and + // the last one indicating the cdc position we have synced until. + final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateMessages1.get(4))); + final AutoCloseableIterator read2 = getSource() + .read(getConfig(), configuredCatalog, state); + + final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); + final Set recordMessages2 = extractRecordMessages(actualRecords2); + final List stateMessages2 = extractStateMessages(actualRecords2); + + assertExpectedRecords(new HashSet<>(MODEL_RECORDS.subList(4, 6)), recordMessages2); + assertEquals(3, stateMessages2.size()); + assertStateTypes(stateMessages2, 0); + } + @Test public void testTwoStreamSync() throws Exception { // Add another stream models_2 and read that one as well.