Skip to content

Commit

Permalink
Add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk committed Aug 21, 2023
1 parent d5994ff commit bfff10d
Showing 1 changed file with 34 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,40 @@ protected void assertStateMessagesForNewTableSnapshotTest(final List<AirbyteStat
assertNotNull(stateMessageEmittedAfterSecondSyncCompletion.getData());
}

@Test
public void testCompositeIndexInitialLoad() throws Exception {
// Simulate adding a composite index by modifying the catalog.
final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(CONFIGURED_CATALOG);
final List<List<String>> primaryKeys = configuredCatalog.getStreams().get(0).getStream().getSourceDefinedPrimaryKey();
primaryKeys.add(List.of("make_id"));

final AutoCloseableIterator<AirbyteMessage> read1 = getSource()
.read(getConfig(), configuredCatalog, null);

final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);

final Set<AirbyteRecordMessage> recordMessages1 = extractRecordMessages(actualRecords1);
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordMessages1);
assertExpectedStateMessages(stateMessages1);

// Re-run the sync with state associated with record w/ id = 15 (second to last record).
// We expect to read 2 records, since in the case of a composite PK we issue a >= query.
// We also expect 3 state records. One associated with the pk state, one to signify end of initial load, and
// the last one indicating the cdc position we have synced until.
final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateMessages1.get(4)));
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), configuredCatalog, state);

final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
final Set<AirbyteRecordMessage> recordMessages2 = extractRecordMessages(actualRecords2);
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);

assertExpectedRecords(new HashSet<>(MODEL_RECORDS.subList(4, 6)), recordMessages2);
assertEquals(3, stateMessages2.size());
assertStateTypes(stateMessages2, 0);
}

@Test
public void testTwoStreamSync() throws Exception {
// Add another stream models_2 and read that one as well.
Expand Down

0 comments on commit bfff10d

Please sign in to comment.