diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 503969f8df66..8c59062dc4d7 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.37.1 +version=0.37.2 diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt index 19f09bc21721..1830720d6623 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt @@ -47,7 +47,8 @@ class AirbyteFileOffsetBackingStore( Jsons.`object`(cdcState, MutableMap::class.java) as Map else emptyMap() - val updatedMap = updateStateForDebezium2_1(mapAsString) + var updatedMap = updateStateForDebezium2_1(mapAsString) + updatedMap = updateStateForDebezium2_6(updatedMap) val mappedAsStrings: Map = updatedMap.entries.associate { @@ -71,7 +72,7 @@ class AirbyteFileOffsetBackingStore( return mapAsString } - LOGGER.info { "Mutating sate to make it Debezium 2.1 compatible" } + LOGGER.info { "Mutating state to make it Debezium 2.1 compatible" } val newKey = if (dbName.isPresent) SQL_SERVER_STATE_MUTATION.apply(key.substring(i, i1 + 1), dbName.get()) @@ -82,6 +83,28 @@ class AirbyteFileOffsetBackingStore( return updatedMap } + // Previously: + // {"["ci-test-database",{"rs":"atlas-pexnnq-shard-0","server_id":"ci-test-database"}]":"{"sec":1715722523,"ord":2,"transaction_id":null,"resume_token":"826643D91B000000022B0429296E1404"}"} + // Now: + // {["ci-test-database",{"server_id":"ci-test-database"}]={"sec":0,"ord":-1,"resume_token":"826643FA09000000022B0429296E1404"}} + private fun updateStateForDebezium2_6(mapAsString: Map): Map { + val updatedMap: MutableMap = LinkedHashMap() + if (mapAsString.size > 0) { + val key = mapAsString.keys.stream().toList()[0] + + if (!key.contains("\"rs\":")) { + // The state is Debezium 2.6 compatible. No need to change anything. + return mapAsString + } + + LOGGER.info { "Mutating state to make it Debezium 2.6 compatible" } + val newKey = mongoShardMutation(key) + val value = mapAsString.getValue(key) + updatedMap[newKey] = value + } + return updatedMap + } + /** * See FileOffsetBackingStore#load - logic is mostly borrowed from here. duplicated because this * method is not public. Reduced the try catch block to only the read operation from original @@ -164,6 +187,22 @@ class AirbyteFileOffsetBackingStore( "\"" + key.substring(key.length - 2)) } + private fun mongoShardMutation(input: String): String { + val jsonObjectStart = input.indexOf("{", input.indexOf("[")) + val jsonObjectEnd = input.lastIndexOf("}") + + // Extract the JSON object as a substring + val jsonObjectString = input.substring(jsonObjectStart, jsonObjectEnd + 1) + + // Remove the "rs" key-value pair using a regex + val modifiedJsonObjectString = + jsonObjectString.replace(Regex("""("rs":\s*".+?",\s*)"""), "") + + // Replace the old JSON object with the modified one in the input string + val finalString = input.replace(jsonObjectString, modifiedJsonObjectString) + + return finalString + } private fun byteBufferToString(byteBuffer: ByteBuffer?): String { Preconditions.checkNotNull(byteBuffer) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle index f027ee29f6ca..9daad40b20d1 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.36.3' + cdkVersionRequired = '0.37.2' features = ['db-sources', 'datastore-mongo'] useLocalCdk = false } @@ -38,8 +38,8 @@ java { } dependencies { - implementation 'io.debezium:debezium-embedded:2.5.1.Final' - implementation 'io.debezium:debezium-connector-mongodb:2.5.1.Final' + implementation 'io.debezium:debezium-embedded:2.6.2.Final' + implementation 'io.debezium:debezium-connector-mongodb:2.6.2.Final' testImplementation 'org.testcontainers:mongodb:1.19.0' @@ -53,8 +53,8 @@ dependencies { dataGeneratorImplementation 'org.jetbrains.kotlinx:kotlinx-cli-jvm:0.3.5' dataGeneratorImplementation 'org.mongodb:mongodb-driver-sync:4.10.2' - debeziumTestImplementation 'io.debezium:debezium-embedded:2.5.1.Final' - debeziumTestImplementation 'io.debezium:debezium-connector-mongodb:2.5.1.Final' + debeziumTestImplementation 'io.debezium:debezium-embedded:2.6.0.Final' + debeziumTestImplementation 'io.debezium:debezium-connector-mongodb:2.6.0.Final' debeziumTestImplementation 'org.jetbrains.kotlinx:kotlinx-cli-jvm:0.3.5' debeziumTestImplementation 'com.github.spotbugs:spotbugs-annotations:4.7.3' } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index 2c5ee78d218f..c737206f9f9e 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -8,7 +8,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.3.15 + dockerImageTag: 1.4.0 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index a165a5183d38..1032ab733550 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -69,6 +69,7 @@ public MongoDbCdcInitializer() { * @param mongoClient The {@link MongoClient} used to interact with the target MongoDB server. * @param cdcMetadataInjector The {@link MongoDbCdcConnectorMetadataInjector} used to add metadata * to generated records. + * @param streams The configured Airbyte catalog of streams for the source. * @param stateManager The {@link MongoDbStateManager} that provides state information used for * iterator selection. * @param emittedAt The timestamp of the sync. @@ -98,7 +99,7 @@ public List> createCdcIterators( final BsonDocument initialResumeToken = MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient, databaseName, incrementalOnlyStreamsCatalog); final JsonNode initialDebeziumState = - mongoDbDebeziumStateUtil.constructInitialDebeziumState(initialResumeToken, mongoClient, databaseName); + mongoDbDebeziumStateUtil.constructInitialDebeziumState(initialResumeToken, databaseName); final MongoDbCdcState cdcState = (stateManager.getCdcState() == null || stateManager.getCdcState().state() == null || stateManager.getCdcState().state().isNull()) ? new MongoDbCdcState(initialDebeziumState, isEnforceSchema) @@ -107,8 +108,7 @@ public List> createCdcIterators( Jsons.clone(defaultDebeziumProperties), incrementalOnlyStreamsCatalog, cdcState.state(), - config.getDatabaseConfig(), - mongoClient); + config.getDatabaseConfig()); // We should always be able to extract offset out of state if it's not null if (cdcState.state() != null && optSavedOffset.isEmpty()) { diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCustomLoader.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCustomLoader.java deleted file mode 100644 index ec537d60b8e4..000000000000 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCustomLoader.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mongodb.cdc; - -import io.airbyte.commons.json.Jsons; -import io.debezium.connector.mongodb.MongoDbConnectorConfig; -import io.debezium.connector.mongodb.MongoDbOffsetContext; -import io.debezium.connector.mongodb.MongoDbOffsetContext.Loader; -import io.debezium.connector.mongodb.ReplicaSets; -import java.util.Collections; -import java.util.Map; - -/** - * Custom Debezium offset loader for MongoDB. - *

- *

- * N.B. In order to extract the offset from the {@link MongoDbCustomLoader}, you must first get the - * {@link io.debezium.connector.mongodb.ReplicaSetOffsetContext} from the - * {@link MongoDbOffsetContext} for the replica set for which the offset is requested. From that - * context, you can then request the actual Debezium offset. - */ -public class MongoDbCustomLoader extends Loader { - - private Map, Map> offsets; - - public MongoDbCustomLoader(final MongoDbConnectorConfig connectorConfig, final ReplicaSets replicaSets) { - super(connectorConfig, replicaSets); - } - - @Override - public MongoDbOffsetContext loadOffsets(final Map, Map> offsets) { - this.offsets = Jsons.clone(offsets); - return super.loadOffsets(offsets); - } - - public Map, Map> getRawOffset() { - return Collections.unmodifiableMap(offsets); - } - -} diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumConstants.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumConstants.java index 170c6ae78552..1e07a6bedfba 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumConstants.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumConstants.java @@ -49,8 +49,8 @@ public static class Configuration { */ public static class OffsetState { - public static final String KEY_REPLICA_SET = SourceInfo.REPLICA_SET_NAME; - public static final String KEY_SERVER_ID = SourceInfo.SERVER_ID_KEY; + // public static final String KEY_REPLICA_SET = SourceInfo.REPLICA_SET_NAME; + public static final String KEY_SERVER_ID = "server_id"; public static final String VALUE_INCREMENT = SourceInfo.ORDER; public static final String VALUE_RESUME_TOKEN = "resume_token"; public static final String VALUE_SECONDS = SourceInfo.TIMESTAMP; diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumPropertiesManager.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumPropertiesManager.java index 6050cbe66b53..5d07a928646b 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumPropertiesManager.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumPropertiesManager.java @@ -40,7 +40,7 @@ public class MongoDbDebeziumPropertiesManager extends DebeziumPropertiesManager static final String DOUBLE_QUOTES_PATTERN = "\""; static final String MONGODB_AUTHSOURCE_KEY = "mongodb.authsource"; static final String MONGODB_CONNECTION_MODE_KEY = "mongodb.connection.mode"; - static final String MONGODB_CONNECTION_MODE_VALUE = "replica_set"; + static final String MONGODB_CONNECTION_MODE_VALUE = "sharded"; static final String MONGODB_CONNECTION_STRING_KEY = "mongodb.connection.string"; static final String MONGODB_PASSWORD_KEY = "mongodb.password"; static final String MONGODB_SSL_ENABLED_KEY = "mongodb.ssl.enabled"; diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumStateUtil.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumStateUtil.java index 862d8b9dc482..adb895bda02e 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumStateUtil.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumStateUtil.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.source.mongodb.cdc; +import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumConstants.OffsetState.KEY_SERVER_ID; + import com.fasterxml.jackson.databind.JsonNode; import com.mongodb.MongoChangeStreamException; import com.mongodb.MongoCommandException; @@ -17,14 +19,12 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.debezium.config.Configuration; +import io.debezium.connector.common.OffsetReader; import io.debezium.connector.mongodb.MongoDbConnectorConfig; import io.debezium.connector.mongodb.MongoDbOffsetContext; -import io.debezium.connector.mongodb.MongoDbTaskContext; -import io.debezium.connector.mongodb.MongoUtil; -import io.debezium.connector.mongodb.ReplicaSetDiscovery; -import io.debezium.connector.mongodb.ReplicaSets; +import io.debezium.connector.mongodb.MongoDbPartition; import io.debezium.connector.mongodb.ResumeTokens; -import java.util.Collection; +import io.debezium.pipeline.spi.Partition; import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -33,6 +33,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Properties; +import java.util.Set; import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.bson.BsonDocument; @@ -53,16 +54,14 @@ public class MongoDbDebeziumStateUtil implements DebeziumStateUtil { * Constructs the initial Debezium offset state that will be used by the incremental CDC snapshot * after an initial snapshot sync. * - * @param mongoClient The {@link MongoClient} used to query the MongoDB server. * @param serverId The ID of the target server. * @return The initial Debezium offset state storage document as a {@link JsonNode}. * @throws IllegalStateException if unable to determine the replica set. */ - public JsonNode constructInitialDebeziumState(final BsonDocument resumeToken, final MongoClient mongoClient, final String serverId) { - final String replicaSet = getReplicaSetName(mongoClient); + public JsonNode constructInitialDebeziumState(final BsonDocument resumeToken, final String serverId) { LOGGER.info("Initial resume token '{}' constructed, corresponding to timestamp (seconds after epoch) {}", ResumeTokens.getData(resumeToken).asString().getValue(), ResumeTokens.getTimestamp(resumeToken).getTime()); - final JsonNode state = formatState(serverId, replicaSet, ((BsonString) ResumeTokens.getData(resumeToken)).getValue()); + final JsonNode state = formatState(serverId, ((BsonString) ResumeTokens.getData(resumeToken)).getValue()); LOGGER.info("Initial Debezium state constructed: {}", state); return state; } @@ -71,36 +70,22 @@ public JsonNode constructInitialDebeziumState(final BsonDocument resumeToken, fi * Formats the Debezium initial state into a format suitable for storage in the offset data file. * * @param serverId The ID target MongoDB database. - * @param replicaSet The name of the target MongoDB replica set. * @param resumeTokenData The MongoDB resume token that represents the offset state. * @return The offset state as a {@link JsonNode}. */ - public static JsonNode formatState(final String serverId, final String replicaSet, final String resumeTokenData) { + public static JsonNode formatState(final String serverId, final String resumeTokenData) { final BsonTimestamp timestamp = ResumeTokens.getTimestamp(ResumeTokens.fromData(resumeTokenData)); - final List key = generateOffsetKey(serverId, replicaSet); + final List key = generateOffsetKey(serverId); final Map value = new LinkedHashMap<>(); value.put(MongoDbDebeziumConstants.OffsetState.VALUE_SECONDS, timestamp.getTime()); value.put(MongoDbDebeziumConstants.OffsetState.VALUE_INCREMENT, timestamp.getInc()); - value.put(MongoDbDebeziumConstants.OffsetState.VALUE_TRANSACTION_ID, null); value.put(MongoDbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN, resumeTokenData); return Jsons.jsonNode(Map.of(Jsons.serialize(key), Jsons.serialize(value))); } - /** - * Retrieves the replica set name for the current connection. - * - * @param mongoClient The {@link MongoClient} used to retrieve the replica set name. - * @return The replica set name. - * @throws IllegalStateException if unable to determine the replica set. - */ - public static String getReplicaSetName(final MongoClient mongoClient) { - final Optional replicaSetName = MongoUtil.replicaSetName(mongoClient.getClusterDescription()); - return replicaSetName.orElseThrow(() -> new IllegalStateException("Unable to determine replica set.")); - } - /** * Test whether the retrieved saved offset resume token value is valid. A valid resume token is one * that can be used to resume a change event stream in MongoDB. @@ -158,13 +143,13 @@ public boolean isValidResumeToken(final BsonDocument savedOffset, public Optional savedOffset(final Properties baseProperties, final ConfiguredAirbyteCatalog catalog, final JsonNode cdcState, - final JsonNode config, - final MongoClient mongoClient) { + final JsonNode config) { LOGGER.debug("Initializing file offset backing store with state '{}'...", cdcState); final var offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcState, Optional.empty()); final DebeziumPropertiesManager debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(baseProperties, config, catalog); final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager); - return parseSavedOffset(debeziumProperties, mongoClient); + LOGGER.info("properties: " + debeziumProperties); + return parseSavedOffset(debeziumProperties); } /** @@ -175,7 +160,7 @@ public Optional savedOffset(final Properties baseProperties, * state * @return Returns the resume token that Airbyte has acknowledged in the source database server. */ - private Optional parseSavedOffset(final Properties properties, final MongoClient mongoClient) { + private Optional parseSavedOffset(final Properties properties) { FileOffsetBackingStore fileOffsetBackingStore = null; OffsetStorageReaderImpl offsetStorageReader = null; @@ -184,31 +169,33 @@ private Optional parseSavedOffset(final Properties properties, fin offsetStorageReader = getOffsetStorageReader(fileOffsetBackingStore, properties); final Configuration config = Configuration.from(properties); - final MongoDbTaskContext taskContext = new MongoDbTaskContext(config); final MongoDbConnectorConfig mongoDbConnectorConfig = new MongoDbConnectorConfig(config); - final ReplicaSets replicaSets = new ReplicaSetDiscovery(taskContext).getReplicaSets(mongoClient); - - LOGGER.debug("Parsing saved offset state for replica set '{}' and server ID '{}'...", replicaSets.all().get(0), properties.getProperty("name")); - - final MongoDbOffsetContext.Loader loader = new MongoDbCustomLoader(mongoDbConnectorConfig, replicaSets); - final Collection> partitions = loader.getPartitions(); - final Map, Map> offsets = offsetStorageReader.offsets(partitions); - - if (offsets != null && offsets.values().stream().anyMatch(Objects::nonNull)) { - final MongoDbOffsetContext offsetContext = loader.loadOffsets(offsets); - final Map offset = offsetContext.getReplicaSetOffsetContext(replicaSets.all().get(0)).getOffset(); - final Object resumeTokenData = offset.get(MongoDbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN); - if (resumeTokenData != null) { - final BsonDocument resumeToken = ResumeTokens.fromData(resumeTokenData.toString()); - return Optional.of(resumeToken); - } else { - LOGGER.warn("Offset data does not contain a resume token: {}", offset); - return Optional.empty(); - } + + final MongoDbOffsetContext.Loader loader = new MongoDbOffsetContext.Loader(mongoDbConnectorConfig); + + final Partition mongoDbPartition = new MongoDbPartition(properties.getProperty(CONNECTOR_NAME_PROPERTY)); + + final Set partitions = + Collections.singleton(mongoDbPartition); + final OffsetReader offsetReader = new OffsetReader<>(offsetStorageReader, loader); + final Map offsets = offsetReader.offsets(partitions); + + if (offsets == null || offsets.values().stream().noneMatch(Objects::nonNull)) { + return Optional.empty(); + } + + final MongoDbOffsetContext context = offsets.get(mongoDbPartition); + final var offset = context.getOffset(); + + final Object resumeTokenData = offset.get(MongoDbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN); + + if (resumeTokenData != null) { + final BsonDocument resumeToken = ResumeTokens.fromData(resumeTokenData.toString()); + return Optional.of(resumeToken); } else { - LOGGER.warn("Loaded offset data is null or empty: {}", offsets); return Optional.empty(); } + } finally { LOGGER.info("Closing offsetStorageReader and fileOffsetBackingStore"); if (offsetStorageReader != null) { @@ -221,7 +208,7 @@ private Optional parseSavedOffset(final Properties properties, fin } } - private static List generateOffsetKey(final String serverId, final String replicaSet) { + private static List generateOffsetKey(final String serverId) { /* * N.B. The order of the keys in the sourceInfoMap and key list matters! DO NOT CHANGE the order * unless you have verified that Debezium has changed its order of the key it builds when retrieving @@ -230,8 +217,7 @@ private static List generateOffsetKey(final String serverId, final Strin */ final Map sourceInfoMap = new LinkedHashMap<>(); final String normalizedServerId = MongoDbDebeziumPropertiesManager.normalizeName(serverId); - sourceInfoMap.put(MongoDbDebeziumConstants.OffsetState.KEY_REPLICA_SET, replicaSet); - sourceInfoMap.put(MongoDbDebeziumConstants.OffsetState.KEY_SERVER_ID, normalizedServerId); + sourceInfoMap.put(KEY_SERVER_ID, normalizedServerId); final List key = new LinkedList<>(); key.add(normalizedServerId); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongoDbSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongoDbSourceAcceptanceTest.java index b81349284ba8..0cc541133675 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongoDbSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongoDbSourceAcceptanceTest.java @@ -505,9 +505,8 @@ void testSyncShouldHandlePurgedLogsGracefully() throws Exception { // Modify the state to point to a non-existing resume token value final AirbyteStateMessage stateMessage = Iterables.getLast(stateMessages); - final String replicaSetName = MongoDbDebeziumStateUtil.getReplicaSetName(mongoClient); final MongoDbCdcState cdcState = new MongoDbCdcState( - MongoDbDebeziumStateUtil.formatState(databaseName, replicaSetName, INVALID_RESUME_TOKEN)); + MongoDbDebeziumStateUtil.formatState(databaseName, INVALID_RESUME_TOKEN)); stateMessage.getGlobal().setSharedState(Jsons.jsonNode(cdcState)); final JsonNode state = Jsons.jsonNode(List.of(stateMessage)); @@ -557,15 +556,14 @@ void testIsSameOffset() { new MongoDbCdcTargetPosition(MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient, databaseName, getConfiguredCatalog())); final BsonDocument resumeToken = MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient, databaseName, getConfiguredCatalog()); final String resumeTokenString = resumeToken.get("_data").asString().getValue(); - final String replicaSet = MongoDbDebeziumStateUtil.getReplicaSetName(mongoClient); final Map emptyOffsetA = Map.of(); final Map emptyOffsetB = Map.of(); final Map offsetA = Jsons.object(MongoDbDebeziumStateUtil.formatState(databaseName, - replicaSet, resumeTokenString), new TypeReference<>() {}); + resumeTokenString), new TypeReference<>() {}); final Map offsetB = Jsons.object(MongoDbDebeziumStateUtil.formatState(databaseName, - replicaSet, resumeTokenString), new TypeReference<>() {}); + resumeTokenString), new TypeReference<>() {}); final Map offsetBDifferent = Jsons.object(MongoDbDebeziumStateUtil.formatState(databaseName, - replicaSet, INVALID_RESUME_TOKEN), new TypeReference<>() {}); + INVALID_RESUME_TOKEN), new TypeReference<>() {}); assertFalse(targetPosition.isSameOffset(null, offsetB)); assertFalse(targetPosition.isSameOffset(emptyOffsetA, offsetB)); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java index 80b6fe27858e..f6518ee49f3a 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java @@ -267,7 +267,7 @@ JsonNode createConfig(String cdcCursorFailBehaviour) { void testUnableToExtractOffsetFromStateException() { final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.COMPLETE), CONFIG); - doReturn(Optional.empty()).when(mongoDbDebeziumStateUtil).savedOffset(any(), any(), any(), any(), any()); + doReturn(Optional.empty()).when(mongoDbDebeziumStateUtil).savedOffset(any(), any(), any(), any()); assertThrows(RuntimeException.class, () -> cdcInitializer.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG_STREAMS, stateManager, EMITTED_AT, CONFIG)); @@ -307,7 +307,7 @@ void testUnsupportedIdTypeThrowsException() { private static JsonNode createInitialDebeziumState(final InitialSnapshotStatus initialSnapshotStatus) { final StreamDescriptor streamDescriptor = new StreamDescriptor().withNamespace(STREAM_NAMESPACE).withName(STREAM_NAME); - final MongoDbCdcState cdcState = new MongoDbCdcState(MongoDbDebeziumStateUtil.formatState(DATABASE, REPLICA_SET, RESUME_TOKEN1)); + final MongoDbCdcState cdcState = new MongoDbCdcState(MongoDbDebeziumStateUtil.formatState(DATABASE, RESUME_TOKEN1)); final MongoDbStreamState mongoDbStreamState = new MongoDbStreamState(ID, initialSnapshotStatus, IdType.OBJECT_ID); final JsonNode sharedState = Jsons.jsonNode(cdcState); final JsonNode streamState = Jsons.jsonNode(mongoDbStreamState); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcSavedInfoFetcherTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcSavedInfoFetcherTest.java index 935051f3b229..2051718eda4f 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcSavedInfoFetcherTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcSavedInfoFetcherTest.java @@ -13,12 +13,11 @@ class MongoDbCdcSavedInfoFetcherTest { private static final String DATABASE = "test-database"; - private static final String REPLICA_SET = "test-replica-set"; private static final String RESUME_TOKEN = "8264BEB9F3000000012B0229296E04"; @Test void testRetrieveSavedOffsetState() { - final JsonNode offset = MongoDbDebeziumStateUtil.formatState(DATABASE, REPLICA_SET, RESUME_TOKEN); + final JsonNode offset = MongoDbDebeziumStateUtil.formatState(DATABASE, RESUME_TOKEN); final MongoDbCdcState offsetState = new MongoDbCdcState(offset); final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(offsetState); assertEquals(offsetState.state(), cdcSavedInfoFetcher.getSavedOffset()); @@ -26,7 +25,7 @@ void testRetrieveSavedOffsetState() { @Test void testRetrieveSchemaHistory() { - final JsonNode offset = MongoDbDebeziumStateUtil.formatState(DATABASE, REPLICA_SET, RESUME_TOKEN); + final JsonNode offset = MongoDbDebeziumStateUtil.formatState(DATABASE, RESUME_TOKEN); final MongoDbCdcState offsetState = new MongoDbCdcState(offset); final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(offsetState); assertThrows(RuntimeException.class, () -> cdcSavedInfoFetcher.getSavedSchemaHistory()); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcStateHandlerTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcStateHandlerTest.java index a40e69ddd7a5..91472e6c64d6 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcStateHandlerTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcStateHandlerTest.java @@ -22,7 +22,6 @@ class MongoDbCdcStateHandlerTest { private static final String DATABASE = "test-database"; - private static final String REPLICA_SET = "test-replica-set"; private static final String RESUME_TOKEN = "8264BEB9F3000000012B0229296E04"; final MongoDbSourceConfig CONFIG = new MongoDbSourceConfig(io.airbyte.commons.json.Jsons.jsonNode( @@ -42,7 +41,7 @@ void setup() { @Test void testSavingState() { final Map offset = - Jsons.object(MongoDbDebeziumStateUtil.formatState(DATABASE, REPLICA_SET, RESUME_TOKEN), new TypeReference<>() {}); + Jsons.object(MongoDbDebeziumStateUtil.formatState(DATABASE, RESUME_TOKEN), new TypeReference<>() {}); final AirbyteMessage airbyteMessage = mongoDbCdcStateHandler.saveState(offset, null); assertNotNull(airbyteMessage); assertEquals(AirbyteMessage.Type.STATE, airbyteMessage.getType()); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcTargetPositionTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcTargetPositionTest.java index 20cf0b1ef9cb..0e6ace990f31 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcTargetPositionTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcTargetPositionTest.java @@ -236,7 +236,7 @@ void testIsEventAheadOfOffset() throws IOException { final ChangeEventWithMetadata changeEventWithMetadata = new ChangeEventWithMetadata(changeEvent); final Map offset = - Jsons.object(MongoDbDebeziumStateUtil.formatState(null, null, RESUME_TOKEN), new TypeReference<>() {}); + Jsons.object(MongoDbDebeziumStateUtil.formatState(null, RESUME_TOKEN), new TypeReference<>() {}); final MongoDbCdcTargetPosition targetPosition = new MongoDbCdcTargetPosition(MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient, DATABASE, CATALOG)); @@ -259,11 +259,11 @@ void testIsSameOffset() { when(mongoDatabase.watch(PIPELINE, BsonDocument.class)).thenReturn(changeStreamIterable); final Map offsetA = - Jsons.object(MongoDbDebeziumStateUtil.formatState(null, null, RESUME_TOKEN), new TypeReference<>() {}); + Jsons.object(MongoDbDebeziumStateUtil.formatState(null, RESUME_TOKEN), new TypeReference<>() {}); final Map offsetB = - Jsons.object(MongoDbDebeziumStateUtil.formatState(null, null, RESUME_TOKEN), new TypeReference<>() {}); + Jsons.object(MongoDbDebeziumStateUtil.formatState(null, RESUME_TOKEN), new TypeReference<>() {}); final Map offsetC = - Jsons.object(MongoDbDebeziumStateUtil.formatState(null, null, OTHER_RESUME_TOKEN), new TypeReference<>() {}); + Jsons.object(MongoDbDebeziumStateUtil.formatState(null, OTHER_RESUME_TOKEN), new TypeReference<>() {}); final MongoDbCdcTargetPosition targetPosition = new MongoDbCdcTargetPosition(MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient, DATABASE, CATALOG)); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCustomLoaderTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCustomLoaderTest.java deleted file mode 100644 index 016c675f9735..000000000000 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCustomLoaderTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mongodb.cdc; - -import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumConstants.ChangeEvent.SOURCE_ORDER; -import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumConstants.ChangeEvent.SOURCE_RESUME_TOKEN; -import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumConstants.ChangeEvent.SOURCE_SECONDS; -import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumConstants.OffsetState.KEY_REPLICA_SET; -import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumConstants.OffsetState.VALUE_TRANSACTION_ID; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.Mockito.mock; - -import com.mongodb.ConnectionString; -import io.debezium.connector.mongodb.MongoDbConnectorConfig; -import io.debezium.connector.mongodb.MongoDbOffsetContext; -import io.debezium.connector.mongodb.ReplicaSets; -import io.debezium.connector.mongodb.ResumeTokens; -import io.debezium.connector.mongodb.connection.ReplicaSet; -import java.util.HashMap; -import java.util.Map; -import org.bson.BsonDocument; -import org.bson.BsonTimestamp; -import org.junit.jupiter.api.Test; - -class MongoDbCustomLoaderTest { - - private static final String RESUME_TOKEN = "8264BEB9F3000000012B0229296E04"; - - @Test - void testLoadOffsets() { - final String replicaSet = "replica-set"; - final BsonDocument resumeToken = ResumeTokens.fromData(RESUME_TOKEN); - final BsonTimestamp timestamp = ResumeTokens.getTimestamp(resumeToken); - final Map key = Map.of(KEY_REPLICA_SET, replicaSet); - final Map value = new HashMap<>(); - value.put(SOURCE_SECONDS, timestamp.getTime()); - value.put(SOURCE_ORDER, timestamp.getInc()); - value.put(SOURCE_RESUME_TOKEN, RESUME_TOKEN); - value.put(VALUE_TRANSACTION_ID, null); - final Map, Map> offsets = Map.of(key, value); - final MongoDbConnectorConfig mongoDbConnectorConfig = mock(MongoDbConnectorConfig.class); - final ReplicaSets replicaSets = ReplicaSets.of( - new ReplicaSet(new ConnectionString("mongodb://localhost:1234/?replicaSet=" + replicaSet))); - final MongoDbCustomLoader loader = new MongoDbCustomLoader(mongoDbConnectorConfig, replicaSets); - - final MongoDbOffsetContext context = loader.loadOffsets(offsets); - final Map offset = context.getReplicaSetOffsetContext(replicaSets.all().get(0)).getOffset(); - - assertNotNull(offset); - assertEquals(value, offset); - } - -} diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumStateUtilTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumStateUtilTest.java index 4288856087e0..8a26d6e3761f 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumStateUtilTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumStateUtilTest.java @@ -7,7 +7,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -47,7 +46,6 @@ class MongoDbDebeziumStateUtilTest { private static final String DATABASE = "test-database"; - private static final String REPLICA_SET = "test-replica-set"; private static final String RESUME_TOKEN = "8264BEB9F3000000012B0229296E04"; private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of( @@ -81,12 +79,11 @@ void testConstructInitialDebeziumState() { MongoDbDebeziumConstants.Configuration.CONNECTION_STRING_CONFIGURATION_KEY, "mongodb://host:12345/", MongoDbDebeziumConstants.Configuration.DATABASE_CONFIGURATION_KEY, database)); - when(serverDescription.getSetName()).thenReturn(REPLICA_SET); when(clusterDescription.getServerDescriptions()).thenReturn(List.of(serverDescription)); when(clusterDescription.getType()).thenReturn(ClusterType.REPLICA_SET); when(mongoClient.getClusterDescription()).thenReturn(clusterDescription); - final JsonNode initialState = mongoDbDebeziumStateUtil.constructInitialDebeziumState(resumeTokenDocument, mongoClient, database); + final JsonNode initialState = mongoDbDebeziumStateUtil.constructInitialDebeziumState(resumeTokenDocument, database); assertNotNull(initialState); assertEquals(1, initialState.size()); @@ -95,40 +92,23 @@ void testConstructInitialDebeziumState() { assertEquals(resumeToken, Jsons.deserialize(offsetState.asText()).get(MongoDbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN).asText()); assertEquals(timestamp.getTime(), Jsons.deserialize(offsetState.asText()).get(MongoDbDebeziumConstants.OffsetState.VALUE_SECONDS).asInt()); assertEquals(timestamp.getInc(), Jsons.deserialize(offsetState.asText()).get(MongoDbDebeziumConstants.OffsetState.VALUE_INCREMENT).asInt()); - assertEquals("null", Jsons.deserialize(offsetState.asText()).get(MongoDbDebeziumConstants.OffsetState.VALUE_TRANSACTION_ID).asText()); final Optional parsedOffset = mongoDbDebeziumStateUtil.savedOffset( baseProperties, CONFIGURED_CATALOG, initialState, - config, - mongoClient); + config); assertTrue(parsedOffset.isPresent()); assertEquals(resumeToken, parsedOffset.get().get("_data").asString().getValue()); } - @Test - void testConstructInitialDebeziumStateMissingReplicaSet() { - final BsonDocument resumeTokenDocument = ResumeTokens.fromData(RESUME_TOKEN); - final ServerDescription serverDescription = mock(ServerDescription.class); - final ClusterDescription clusterDescription = mock(ClusterDescription.class); - final MongoClient mongoClient = mock(MongoClient.class); - - when(clusterDescription.getServerDescriptions()).thenReturn(List.of(serverDescription)); - when(clusterDescription.getType()).thenReturn(ClusterType.REPLICA_SET); - when(mongoClient.getClusterDescription()).thenReturn(clusterDescription); - - assertThrows(IllegalStateException.class, - () -> mongoDbDebeziumStateUtil.constructInitialDebeziumState(resumeTokenDocument, mongoClient, DATABASE)); - } - @Test void testOffsetDataFormat() { - final JsonNode offsetState = MongoDbDebeziumStateUtil.formatState(DATABASE, REPLICA_SET, RESUME_TOKEN); + final JsonNode offsetState = MongoDbDebeziumStateUtil.formatState(DATABASE, RESUME_TOKEN); assertNotNull(offsetState); - assertEquals("[\"" + DATABASE + "\",{\"" + MongoDbDebeziumConstants.OffsetState.KEY_REPLICA_SET + "\":\"" + REPLICA_SET + "\",\"" + assertEquals("[\"" + DATABASE + "\",{\"" + MongoDbDebeziumConstants.OffsetState.KEY_SERVER_ID + "\":\"" + DATABASE + "\"}]", offsetState.fieldNames().next()); } diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index b87c5de1e00d..abc8fb5b9b0d 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -198,9 +198,10 @@ For more information regarding configuration parameters, please see [MongoDb Doc Expand to review | Version | Date | Pull Request | Subject | -|:--------| :--------- | :------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------- | -| 1.3.15 | 2024-05-30 | [38781](https://github.com/airbytehq/airbyte/pull/38781) | Sync sending trace status messages indicating progress. | -| 1.3.14 | 2024-05-29 | [38584](https://github.com/airbytehq/airbyte/pull/38584) | Set is_resumable flag in discover. | +|:--------|:-----------| :------------------------------------------------------- |:----------------------------------------------------------------------------------------------------------| +| 1.4.0 | 2024-06-11 | [38238](https://github.com/airbytehq/airbyte/pull/38238) | Update mongodbv2 to use dbz 2.6.2 | +| 1.3.15 | 2024-05-30 | [38781](https://github.com/airbytehq/airbyte/pull/38781) | Sync sending trace status messages indicating progress. | +| 1.3.14 | 2024-05-29 | [38584](https://github.com/airbytehq/airbyte/pull/38584) | Set is_resumable flag in discover. | | 1.3.13 | 2024-05-09 | [36851](https://github.com/airbytehq/airbyte/pull/36851) | Support reading collection with a binary \_id type. | | 1.3.12 | 2024-05-07 | [36851](https://github.com/airbytehq/airbyte/pull/36851) | Upgrade debezium to version 2.5.1. | | 1.3.11 | 2024-05-02 | [37753](https://github.com/airbytehq/airbyte/pull/37753) | Chunk size(limit) should correspond to ~1GB of data. |