Skip to content

Commit

Permalink
✨ Source Mongodb POC: add read support (#29066)
Browse files Browse the repository at this point in the history
Co-authored-by: Jonathan Pearlin <[email protected]>
Co-authored-by: Jose Pefaur <[email protected]>
  • Loading branch information
3 people authored Aug 21, 2023
1 parent 3676485 commit 0461430
Show file tree
Hide file tree
Showing 15 changed files with 914 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -175,6 +176,11 @@ protected void verifyCatalog(final AirbyteCatalog catalog) throws Exception {
*/
@Test
public void testFullRefreshRead() throws Exception {
if (!sourceSupportsFullRefresh()) {
LOGGER.info("Test skipped. Source does not support full refresh.");
return;
}

final ConfiguredAirbyteCatalog catalog = withFullRefreshSyncModes(getConfiguredCatalog());
final List<AirbyteMessage> allMessages = runRead(catalog);

Expand All @@ -196,6 +202,11 @@ protected void assertFullRefreshMessages(final List<AirbyteMessage> allMessages)
*/
@Test
public void testIdenticalFullRefreshes() throws Exception {
if (!sourceSupportsFullRefresh()) {
LOGGER.info("Test skipped. Source does not support full refresh.");
return;
}

if (IMAGES_TO_SKIP_IDENTICAL_FULL_REFRESHES.contains(getImageName().split(":")[0])) {
return;
}
Expand Down Expand Up @@ -275,15 +286,20 @@ public void testEmptyStateIncrementalIdenticalToFullRefresh() throws Exception {
return;
}

if (!sourceSupportsFullRefresh()) {
LOGGER.info("Test skipped. Source does not support full refresh.");
return;
}

final ConfiguredAirbyteCatalog configuredCatalog = getConfiguredCatalog();
final ConfiguredAirbyteCatalog fullRefreshCatalog = withFullRefreshSyncModes(configuredCatalog);

final List<AirbyteRecordMessage> fullRefreshRecords = filterRecords(runRead(fullRefreshCatalog));
final List<AirbyteRecordMessage> emptyStateRecords = filterRecords(runRead(configuredCatalog, Jsons.jsonNode(new HashMap<>())));
final String assertionMessage = "Expected a full refresh sync and incremental sync with no input state to produce identical records";
assertFalse(fullRefreshRecords.isEmpty(), assertionMessage);
assertFalse(emptyStateRecords.isEmpty(), assertionMessage);
assertSameRecords(fullRefreshRecords, emptyStateRecords, assertionMessage);
assertFalse(fullRefreshRecords.isEmpty(), "Expected a full refresh sync to produce records");
assertFalse(emptyStateRecords.isEmpty(), "Expected state records to not be empty");
assertSameRecords(fullRefreshRecords, emptyStateRecords,
"Expected a full refresh sync and incremental sync with no input state to produce identical records");
}

/**
Expand Down Expand Up @@ -327,9 +343,17 @@ protected ConfiguredAirbyteCatalog withFullRefreshSyncModes(final ConfiguredAirb
}

private boolean sourceSupportsIncremental() throws Exception {
return sourceSupports(INCREMENTAL);
}

private boolean sourceSupportsFullRefresh() throws Exception {
return sourceSupports(FULL_REFRESH);
}

private boolean sourceSupports(final SyncMode syncMode) throws Exception {
final ConfiguredAirbyteCatalog catalog = getConfiguredCatalog();
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
if (stream.getStream().getSupportedSyncModes().contains(INCREMENTAL)) {
if (stream.getStream().getSupportedSyncModes().contains(syncMode)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ acceptance_tests:
spec:
tests:
- spec_path: "integration_tests/expected_spec.json"
backward_compatibility_tests_config:
disable_for_version: "0.0.1"
config_path: "secrets/credentials.json"
timeout_seconds: 60
connection:
Expand All @@ -13,17 +15,24 @@ acceptance_tests:
status: "succeed"
timeout_seconds: 60
discovery:
tests:
- config_path: "secrets/credentials.json"
timeout_seconds: 60
bypass_reason: "The first version of this connector returns null when discovery is called, which causes the test to fail. We can stop bypassing this test once a new version of the connector is released."
#TODO: remove bypass_reason once a version that supports discovery is released + uncomment the lines below
# tests:
# - config_path: "secrets/credentials.json"
# backward_compatibility_tests_config:
# disable_for_version: "0.0.1"
# timeout_seconds: 60
basic_read:
tests:
- config_path: "secrets/credentials.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 120
bypass_reason: "Full refresh syncs are not supported on this connector."
full_refresh:
tests:
- config_path: "secrets/credentials.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 180
bypass_reason: "Full refresh syncs are not supported on this connector."
incremental:
bypass_reason: "Incremental syncs are not yet supported by this connector."
#TODO: remove bypass_reason once a version that supports incremental syncs is released + uncomment the lines below
# tests:
# - config_path: "secrets/credentials.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# cursor_paths:
# listingsAndReviews: ["id"]
# timeout_seconds: 180

Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ tasks.register('generateTestData', JavaExec) {
main 'io.airbyte.integrations.source.mongodb.internal.MongoDbInsertClient'
standardInput = System.in
args arguments
}
}
Loading

0 comments on commit 0461430

Please sign in to comment.