From af78d6317d6955dbd9dc859c23539c72de60052f Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Thu, 25 Jul 2024 19:23:51 +0800 Subject: [PATCH] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 This closes #3489. --- .../docs/connectors/flink-sources/overview.md | 2 +- .../docs/connectors/flink-sources/overview.md | 2 +- .../flink-connector-mongodb-cdc/pom.xml | 4 +-- .../source/MongoDBFullChangelogITCase.java | 34 +++++++++++------- .../MongoDBParallelSourceExampleTest.java | 24 +++++++++++-- .../source/MongoDBParallelSourceITCase.java | 21 ++++++++--- .../mongodb/source/MongoDBSourceTestBase.java | 30 +++++++++------- .../mongodb/source/NewlyAddedTableITCase.java | 19 ++++++++-- .../MongoDBSnapshotSplitReaderTest.java | 17 +++++++-- .../reader/MongoDBStreamSplitReaderTest.java | 17 +++++++-- .../mongodb/table/MongoDBConnectorITCase.java | 28 +++++++++------ .../table/MongoDBRegexFilterITCase.java | 36 +++++++++++-------- .../mongodb/table/MongoDBTimeZoneITCase.java | 32 +++++++++-------- .../cdc/connectors/tests/MongoE2eITCase.java | 20 +++++++---- 14 files changed, 197 insertions(+), 89 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/overview.md b/docs/content.zh/docs/connectors/flink-sources/overview.md index ee816903522..27b826317b5 100644 --- a/docs/content.zh/docs/connectors/flink-sources/overview.md +++ b/docs/content.zh/docs/connectors/flink-sources/overview.md @@ -37,7 +37,7 @@ You can also read [tutorials]({{< ref "docs/connectors/flink-sources/tutorials/b | Connector | Database | Driver | |----------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------| -| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) |
  • [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.4 | +| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) |
  • [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0, 6.0, 6.1, 7.0 | MongoDB Driver: 4.11.2 | | [mysql-cdc]({{< ref "docs/connectors/flink-sources/mysql-cdc" >}}) |
  • [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x
  • [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x
  • [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x
  • [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x
  • [MariaDB](https://mariadb.org): 10.x
  • [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.28 | | [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) |
  • [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x
  • [OceanBase EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x | OceanBase Driver: 2.4.x | | [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) |
  • [Oracle](https://www.oracle.com/index.html): 11, 12, 19, 21 | Oracle Driver: 19.3.0.0 | diff --git a/docs/content/docs/connectors/flink-sources/overview.md b/docs/content/docs/connectors/flink-sources/overview.md index 56ddd62616b..962d02c1c53 100644 --- a/docs/content/docs/connectors/flink-sources/overview.md +++ b/docs/content/docs/connectors/flink-sources/overview.md @@ -37,7 +37,7 @@ You can also read [tutorials]({{< ref "docs/connectors/flink-sources/tutorials/b | Connector | Database | Driver | |----------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------| -| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) |
  • [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.4 | +| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) |
  • [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0, 6.0, 6.1, 7.0 | MongoDB Driver: 4.11.2 | | [mysql-cdc]({{< ref "docs/connectors/flink-sources/mysql-cdc" >}}) |
  • [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x
  • [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x
  • [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x
  • [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x
  • [MariaDB](https://mariadb.org): 10.x
  • [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.28 | | [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) |
  • [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x
  • [OceanBase EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x | OceanBase Driver: 2.4.x | | [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) |
  • [Oracle](https://www.oracle.com/index.html): 11, 12, 19, 21 | Oracle Driver: 19.3.0.0 | diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml index 3c8efc046a3..2b58e677872 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml @@ -53,7 +53,7 @@ limitations under the License. org.mongodb.kafka mongo-kafka-connect - 1.10.1 + 1.13.0 mongodb-driver-sync @@ -69,7 +69,7 @@ limitations under the License. org.mongodb mongodb-driver-sync - 4.9.1 + 4.11.2 diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java index 02d6a82d166..8d8047fa72c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java @@ -78,28 +78,36 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase { @Rule public final Timeout timeoutPerTest = Timeout.seconds(300); + private final String mongoVersion; private final boolean parallelismSnapshot; - public MongoDBFullChangelogITCase(boolean parallelismSnapshot) { + public MongoDBFullChangelogITCase(String mongoVersion, boolean parallelismSnapshot) { + super(mongoVersion); + this.mongoVersion = mongoVersion; this.parallelismSnapshot = parallelismSnapshot; } - @Parameterized.Parameters(name = "parallelismSnapshot: {0}") + @Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: {1}") public static Object[] parameters() { - return new Object[][] {new Object[] {false}, new Object[] {true}}; + List parameterTuples = new ArrayList<>(); + for (String mongoVersion : MONGO_VERSIONS) { + parameterTuples.add(new Object[] {mongoVersion, true}); + parameterTuples.add(new Object[] {mongoVersion, false}); + } + return parameterTuples.toArray(); } @Test public void testGetMongoDBVersion() { MongoDBSourceConfig config = new MongoDBSourceConfigFactory() - .hosts(CONTAINER.getHostAndPort()) + .hosts(mongoContainer.getHostAndPort()) .splitSizeMB(1) .samplesPerChunk(10) .pollAwaitTimeMillis(500) .create(0); - assertEquals(MongoUtils.getMongoVersion(config), "6.0.9"); + assertEquals(MongoUtils.getMongoVersion(config), mongoVersion); } @Test @@ -499,16 +507,16 @@ private List testBackfillWhenWritingEvents( "customer_" + Integer.toUnsignedString(new Random().nextInt(), 36); // A - enable system-level fulldoc pre & post image feature - CONTAINER.executeCommand( + mongoContainer.executeCommand( "use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })"); // B - enable collection-level fulldoc pre & post image for change capture collection - CONTAINER.executeCommandInDatabase( + mongoContainer.executeCommandInDatabase( String.format( "db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })", "customers", "customers"), customerDatabase); - CONTAINER.executeCommandFileInDatabase("customer", customerDatabase); + mongoContainer.executeCommandFileInDatabase("customer", customerDatabase); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); @@ -526,7 +534,7 @@ private List testBackfillWhenWritingEvents( TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema); MongoDBSource source = new MongoDBSourceBuilder() - .hosts(CONTAINER.getHostAndPort()) + .hosts(mongoContainer.getHostAndPort()) .databaseList(customerDatabase) .username(FLINK_USER) .password(FLINK_USER_PASSWORD) @@ -613,12 +621,12 @@ private void testMongoDBParallelSource( "customer_" + Integer.toUnsignedString(new Random().nextInt(), 36); // A - enable system-level fulldoc pre & post image feature - CONTAINER.executeCommand( + mongoContainer.executeCommand( "use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })"); // B - enable collection-level fulldoc pre & post image for change capture collection for (String collectionName : captureCustomerCollections) { - CONTAINER.executeCommandInDatabase( + mongoContainer.executeCommandInDatabase( String.format( "db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })", collectionName, collectionName), @@ -654,14 +662,14 @@ private void testMongoDBParallelSource( + " 'scan.incremental.snapshot.backfill.skip' = '%s'" + ")", parallelismSnapshot ? "true" : "false", - CONTAINER.getHostAndPort(), + mongoContainer.getHostAndPort(), FLINK_USER, FLINK_USER_PASSWORD, customerDatabase, getCollectionNameRegex(customerDatabase, captureCustomerCollections), skipSnapshotBackfill); - CONTAINER.executeCommandFileInDatabase("customer", customerDatabase); + mongoContainer.executeCommandFileInDatabase("customer", customerDatabase); // first step: check the snapshot data String[] snapshotForSingleTable = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java index dd213413659..a285fe4a395 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java @@ -24,22 +24,42 @@ import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.List; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD; import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH; /** Example Tests for {@link MongoDBSource}. */ +@RunWith(Parameterized.class) public class MongoDBParallelSourceExampleTest extends MongoDBSourceTestBase { + @Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: {1}") + public static Object[] parameters() { + List parameterTuples = new ArrayList<>(); + for (String mongoVersion : MONGO_VERSIONS) { + parameterTuples.add(new Object[] {mongoVersion, true}); + parameterTuples.add(new Object[] {mongoVersion, false}); + } + return parameterTuples.toArray(); + } + + public MongoDBParallelSourceExampleTest(String mongoVersion) { + super(mongoVersion); + } + @Test @Ignore("Test ignored because it won't stop and is used for manual test") public void testMongoDBExampleSource() throws Exception { - String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory"); + String database = mongoContainer.executeCommandFileInSeparateDatabase("inventory"); MongoDBSource mongoSource = MongoDBSource.builder() - .hosts(CONTAINER.getHostAndPort()) + .hosts(mongoContainer.getHostAndPort()) .databaseList(database) .collectionList(database + ".products") .username(FLINK_USER) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java index 11332eb74e4..25918fd8bbe 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java @@ -46,12 +46,15 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER; @@ -65,11 +68,21 @@ import static org.apache.flink.util.Preconditions.checkState; /** IT tests for {@link MongoDBSource}. */ +@RunWith(Parameterized.class) public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase { private static final int USE_POST_LOWWATERMARK_HOOK = 1; private static final int USE_PRE_HIGHWATERMARK_HOOK = 2; private static final int USE_POST_HIGHWATERMARK_HOOK = 3; + public MongoDBParallelSourceITCase(String mongoVersion) { + super(mongoVersion); + } + + @Parameterized.Parameters(name = "mongoVersion: {0}") + public static Object[] parameters() { + return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray(); + } + @Rule public final Timeout timeoutPerTest = Timeout.seconds(300); @Test @@ -406,7 +419,7 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception { private List testBackfillWhenWritingEvents( boolean skipBackFill, int fetchSize, int hookType, StartupOptions startupOptions) throws Exception { - String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer"); + String customerDatabase = mongoContainer.executeCommandFileInSeparateDatabase("customer"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); env.setParallelism(1); @@ -423,7 +436,7 @@ private List testBackfillWhenWritingEvents( TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema); MongoDBSource source = new MongoDBSourceBuilder() - .hosts(CONTAINER.getHostAndPort()) + .hosts(mongoContainer.getHostAndPort()) .databaseList(customerDatabase) .username(FLINK_USER) .password(FLINK_USER_PASSWORD) @@ -507,7 +520,7 @@ private void testMongoDBParallelSource( boolean skipSnapshotBackfill) throws Exception { - String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer"); + String customerDatabase = mongoContainer.executeCommandFileInSeparateDatabase("customer"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -535,7 +548,7 @@ private void testMongoDBParallelSource( + " 'heartbeat.interval.ms' = '500'," + " 'scan.incremental.snapshot.backfill.skip' = '%s'" + ")", - CONTAINER.getHostAndPort(), + mongoContainer.getHostAndPort(), FLINK_USER, FLINK_USER_PASSWORD, customerDatabase, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java index 6bfcca8244c..49a362969ac 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java @@ -26,8 +26,7 @@ import com.mongodb.MongoClientSettings; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; -import org.junit.BeforeClass; -import org.junit.ClassRule; +import org.junit.Before; import org.junit.Rule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,10 +38,21 @@ /** MongoDBSourceTestBase for MongoDB >= 5.0.3. */ public class MongoDBSourceTestBase { - protected static MongoClient mongodbClient; + public MongoDBSourceTestBase(String mongoVersion) { + this.mongoContainer = + new MongoDBContainer("mongo:" + mongoVersion) + .withSharding() + .withLogConsumer(new Slf4jLogConsumer(LOG)); + } + + public static final String[] MONGO_VERSIONS = {"6.0.16", "7.0.12"}; protected static final int DEFAULT_PARALLELISM = 4; + @Rule public final MongoDBContainer mongoContainer; + + protected MongoClient mongodbClient; + @Rule public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource( @@ -53,15 +63,15 @@ public class MongoDBSourceTestBase { .withHaLeadershipControl() .build()); - @BeforeClass - public static void startContainers() { + @Before + public void startContainers() { LOG.info("Starting containers..."); - Startables.deepStart(Stream.of(CONTAINER)).join(); + Startables.deepStart(Stream.of(mongoContainer)).join(); MongoClientSettings settings = MongoClientSettings.builder() .applyConnectionString( - new ConnectionString(CONTAINER.getConnectionString())) + new ConnectionString(mongoContainer.getConnectionString())) .build(); mongodbClient = MongoClients.create(settings); @@ -69,10 +79,4 @@ public static void startContainers() { } private static final Logger LOG = LoggerFactory.getLogger(MongoDBSourceTestBase.class); - - @ClassRule - public static final MongoDBContainer CONTAINER = - new MongoDBContainer("mongo:6.0.9") - .withSharding() - .withLogConsumer(new Slf4jLogConsumer(LOG)); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java index 4ffc74cbff0..69900b4e92d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java @@ -41,6 +41,8 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.sql.SQLException; import java.util.ArrayList; @@ -55,6 +57,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.lang.String.format; import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER; @@ -62,6 +65,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** IT tests to cover various newly added collections during capture process. */ +@RunWith(Parameterized.class) public class NewlyAddedTableITCase extends MongoDBSourceTestBase { @Rule public final Timeout timeoutPerTest = Timeout.seconds(500); @@ -69,6 +73,15 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase { private String customerDatabase; protected static final int DEFAULT_PARALLELISM = 4; + public NewlyAddedTableITCase(String mongoVersion) { + super(mongoVersion); + } + + @Parameterized.Parameters(name = "mongoVersion: {0}") + public static Object[] parameters() { + return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray(); + } + private final ScheduledExecutorService mockChangelogExecutor = Executors.newScheduledThreadPool(1); @@ -79,7 +92,7 @@ public void before() throws SQLException { // prepare initial data for given collection String collectionName = "produce_changelog"; // enable system-level fulldoc pre & post image feature - CONTAINER.executeCommand( + mongoContainer.executeCommand( "use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })"); // mock continuous changelog during the newly added collections capturing process @@ -846,7 +859,7 @@ private void initialAddressCollections( // make initial data for given collection. String cityName = collectionName.split("_")[1]; // B - enable collection-level fulldoc pre & post image for change capture collection - CONTAINER.executeCommandInDatabase( + mongoContainer.executeCommandInDatabase( String.format( "db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })", collectionName, collectionName), @@ -983,7 +996,7 @@ private String getCreateTableStatement( + " 'scan.newly-added-table.enabled' = 'true'" + " %s" + ")", - CONTAINER.getHostAndPort(), + mongoContainer.getHostAndPort(), FLINK_USER, FLINK_USER_PASSWORD, customerDatabase, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java index e439c098fe8..b683f303dad 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java @@ -42,11 +42,14 @@ import org.bson.BsonDocument; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Iterator; import java.util.LinkedList; +import java.util.stream.Stream; import static java.util.Collections.singletonList; import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isWatermarkEvent; @@ -57,6 +60,7 @@ import static org.junit.Assert.assertTrue; /** MongoDB snapshot split reader test case. */ +@RunWith(Parameterized.class) public class MongoDBSnapshotSplitReaderTest extends MongoDBSourceTestBase { private static final Logger LOG = LoggerFactory.getLogger(MongoDBSnapshotSplitReaderTest.class); @@ -71,13 +75,22 @@ public class MongoDBSnapshotSplitReaderTest extends MongoDBSourceTestBase { private SplitContext splitContext; + public MongoDBSnapshotSplitReaderTest(String mongoVersion) { + super(mongoVersion); + } + + @Parameterized.Parameters(name = "mongoVersion: {0}") + public static Object[] parameters() { + return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray(); + } + @Before public void before() { - database = CONTAINER.executeCommandFileInSeparateDatabase("chunk_test"); + database = mongoContainer.executeCommandFileInSeparateDatabase("chunk_test"); MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory() - .hosts(CONTAINER.getHostAndPort()) + .hosts(mongoContainer.getHostAndPort()) .databaseList(database) .collectionList(database + ".shopping_cart") .username(FLINK_USER) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java index 1699a550cd5..bf781904bf9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java @@ -43,12 +43,15 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.stream.Stream; import static java.util.Collections.singletonList; import static org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope.FULL_DOCUMENT_FIELD; @@ -65,6 +68,7 @@ import static org.junit.Assert.assertTrue; /** MongoDB stream split reader test case. */ +@RunWith(Parameterized.class) public class MongoDBStreamSplitReaderTest extends MongoDBSourceTestBase { @Rule public final Timeout timeoutPerTest = Timeout.seconds(300); @@ -85,13 +89,22 @@ public class MongoDBStreamSplitReaderTest extends MongoDBSourceTestBase { private BsonDocument startupResumeToken; + public MongoDBStreamSplitReaderTest(String mongoVersion) { + super(mongoVersion); + } + + @Parameterized.Parameters(name = "mongoVersion: {0}") + public static Object[] parameters() { + return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray(); + } + @Before public void before() { - database = CONTAINER.executeCommandFileInSeparateDatabase("chunk_test"); + database = mongoContainer.executeCommandFileInSeparateDatabase("chunk_test"); MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory() - .hosts(CONTAINER.getHostAndPort()) + .hosts(mongoContainer.getHostAndPort()) .databaseList(database) .collectionList(database + ".shopping_cart") .username(FLINK_USER) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java index b15ebd1fccb..19d47856e10 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java @@ -67,13 +67,19 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase { private final boolean parallelismSnapshot; - public MongoDBConnectorITCase(boolean parallelismSnapshot) { + public MongoDBConnectorITCase(String mongoVersion, boolean parallelismSnapshot) { + super(mongoVersion); this.parallelismSnapshot = parallelismSnapshot; } - @Parameterized.Parameters(name = "parallelismSnapshot: {0}") + @Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: {1}") public static Object[] parameters() { - return new Object[][] {new Object[] {false}, new Object[] {true}}; + return new Object[][] { + new Object[] {"6.0.16", true}, + new Object[] {"6.0.16", false}, + new Object[] {"7.0.12", true}, + new Object[] {"7.0.12", false} + }; } @Before @@ -90,7 +96,7 @@ public void before() { @Test public void testConsumingAllEvents() throws ExecutionException, InterruptedException { - String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory"); + String database = mongoContainer.executeCommandFileInSeparateDatabase("inventory"); String sourceDDL = String.format( @@ -111,7 +117,7 @@ public void testConsumingAllEvents() throws ExecutionException, InterruptedExcep + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'heartbeat.interval.ms' = '1000'" + ")", - CONTAINER.getHostAndPort(), + mongoContainer.getHostAndPort(), FLINK_USER, FLINK_USER_PASSWORD, database, @@ -223,7 +229,7 @@ public void testConsumingAllEvents() throws ExecutionException, InterruptedExcep @Test public void testStartupFromTimestamp() throws Exception { - String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory"); + String database = mongoContainer.executeCommandFileInSeparateDatabase("inventory"); // Unfortunately we have to sleep here to differ initial and later-generating changes in // oplog by timestamp @@ -252,7 +258,7 @@ public void testStartupFromTimestamp() throws Exception { + "'," + " 'heartbeat.interval.ms' = '1000'" + ")", - CONTAINER.getHostAndPort(), + mongoContainer.getHostAndPort(), FLINK_USER, FLINK_USER_PASSWORD, database, @@ -302,7 +308,7 @@ public void testStartupFromTimestamp() throws Exception { @Test public void testAllTypes() throws Throwable { - String database = CONTAINER.executeCommandFileInSeparateDatabase("column_type_test"); + String database = mongoContainer.executeCommandFileInSeparateDatabase("column_type_test"); String sourceDDL = String.format( @@ -345,7 +351,7 @@ public void testAllTypes() throws Throwable { + " 'database' = '%s'," + " 'collection' = '%s'" + ")", - CONTAINER.getHostAndPort(), + mongoContainer.getHostAndPort(), FLINK_USER, FLINK_USER_PASSWORD, database, @@ -465,7 +471,7 @@ public void testAllTypes() throws Throwable { @Test public void testMetadataColumns() throws Exception { - String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory"); + String database = mongoContainer.executeCommandFileInSeparateDatabase("inventory"); String sourceDDL = String.format( @@ -487,7 +493,7 @@ public void testMetadataColumns() throws Exception { + " 'collection' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'" + ")", - CONTAINER.getHostAndPort(), + mongoContainer.getHostAndPort(), FLINK_USER, FLINK_USER_PASSWORD, database, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java index 317dc7468fd..3352fe1efe0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java @@ -52,13 +52,19 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase { private final boolean parallelismSnapshot; - public MongoDBRegexFilterITCase(boolean parallelismSnapshot) { + public MongoDBRegexFilterITCase(String mongoVersion, boolean parallelismSnapshot) { + super(mongoVersion); this.parallelismSnapshot = parallelismSnapshot; } - @Parameterized.Parameters(name = "parallelismSnapshot: {0}") + @Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: {1}") public static Object[] parameters() { - return new Object[][] {new Object[] {false}, new Object[] {true}}; + return new Object[][] { + new Object[] {"6.0.16", true}, + new Object[] {"6.0.16", false}, + new Object[] {"7.0.12", true}, + new Object[] {"7.0.12", false} + }; } @Before @@ -77,9 +83,9 @@ public void before() { public void testMatchMultipleDatabasesAndCollections() throws Exception { // 1. Given collections: // db0: [coll_a1, coll_a2, coll_b1, coll_b2] - String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex"); + String db0 = mongoContainer.executeCommandFileInSeparateDatabase("ns_regex"); // db1: [coll_a1, coll_a2, coll_b1, coll_b2] - String db1 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex"); + String db1 = mongoContainer.executeCommandFileInSeparateDatabase("ns_regex"); // 2. Test match: collection = ^(db0|db1)\.coll_a\d?$ String collectionRegex = String.format("^(%s|%s)\\.coll_a\\d?$", db0, db1); @@ -120,11 +126,11 @@ public void testMatchMultipleDatabasesAndCollections() throws Exception { public void testMatchMultipleDatabases() throws Exception { // 1. Given collections: // db0: [coll_a1, coll_a2, coll_b1, coll_b2] - String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex"); + String db0 = mongoContainer.executeCommandFileInSeparateDatabase("ns_regex"); // db1: [coll_a1, coll_a2, coll_b1, coll_b2] - String db1 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex"); + String db1 = mongoContainer.executeCommandFileInSeparateDatabase("ns_regex"); // db2: [coll_a1, coll_a2, coll_b1, coll_b2] - String db2 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex"); + String db2 = mongoContainer.executeCommandFileInSeparateDatabase("ns_regex"); // 2. Test match database: ^(db0|db1)$ String databaseRegex = String.format("%s|%s", db0, db1); @@ -174,9 +180,9 @@ public void testMatchMultipleDatabases() throws Exception { public void testMatchSingleQualifiedCollectionPattern() throws Exception { // 1. Given collections: // db0: [coll_a1, coll_a2, coll_b1, coll_b2] - String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex"); + String db0 = mongoContainer.executeCommandFileInSeparateDatabase("ns_regex"); // db1: [coll_a1, coll_a2, coll_b1, coll_b2] - String db1 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex"); + String db1 = mongoContainer.executeCommandFileInSeparateDatabase("ns_regex"); // 2. Test match: collection ^(db0|db1)\.coll_a\d?$ String collectionRegex = String.format("^%s\\.coll_b\\d?$", db0); @@ -213,9 +219,9 @@ public void testMatchSingleQualifiedCollectionPattern() throws Exception { public void testMatchSingleDatabaseWithCollectionPattern() throws Exception { // 1. Given collections: // db0: [coll_a1, coll_a2, coll_b1, coll_b2] - String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex"); + String db0 = mongoContainer.executeCommandFileInSeparateDatabase("ns_regex"); // db1: [coll_a1, coll_a2, coll_b1, coll_b2] - String db1 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex"); + String db1 = mongoContainer.executeCommandFileInSeparateDatabase("ns_regex"); // 2. Test match: collection .*coll_b\d? String collectionRegex = ".*coll_b\\d?"; @@ -251,7 +257,7 @@ public void testMatchSingleDatabaseWithCollectionPattern() throws Exception { public void testMatchDatabaseAndCollectionContainsDash() throws Exception { // 1. Given collections: // db0: [coll-a1, coll-a2, coll-b1, coll-b2] - String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns-regex"); + String db0 = mongoContainer.executeCommandFileInSeparateDatabase("ns-regex"); TableResult result = submitTestCase(db0, "coll-a1"); @@ -271,7 +277,7 @@ public void testMatchDatabaseAndCollectionContainsDash() throws Exception { public void testMatchCollectionWithDots() throws Exception { // 1. Given colllections: // db: [coll.name] - String db = CONTAINER.executeCommandFileInSeparateDatabase("ns-dotted"); + String db = mongoContainer.executeCommandFileInSeparateDatabase("ns-dotted"); TableResult result = submitTestCase(db, db + "[.]coll[.]name"); @@ -301,7 +307,7 @@ private TableResult submitTestCase(String database, String collection) throws Ex + " coll_name STRING METADATA FROM 'collection_name' VIRTUAL," + " PRIMARY KEY (_id) NOT ENFORCED" + ") WITH (" - + ignoreIfNull("hosts", CONTAINER.getHostAndPort()) + + ignoreIfNull("hosts", mongoContainer.getHostAndPort()) + ignoreIfNull("username", FLINK_USER) + ignoreIfNull("password", FLINK_USER_PASSWORD) + ignoreIfNull("database", database) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java index c3e7112c2ef..f9edc73fa24 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java @@ -55,21 +55,25 @@ public class MongoDBTimeZoneITCase extends MongoDBSourceTestBase { private final boolean parallelismSnapshot; - public MongoDBTimeZoneITCase(String localTimeZone, boolean parallelismSnapshot) { + public MongoDBTimeZoneITCase( + String mongoVersion, String localTimeZone, boolean parallelismSnapshot) { + super(mongoVersion); this.localTimeZone = localTimeZone; this.parallelismSnapshot = parallelismSnapshot; } - @Parameterized.Parameters(name = "localTimeZone: {0}, parallelismSnapshot: {1}") + @Parameterized.Parameters( + name = "mongoVersion: {0}, localTimeZone: {1}, parallelismSnapshot: {2}") public static Object[] parameters() { - return new Object[][] { - new Object[] {"Asia/Shanghai", false}, - new Object[] {"Europe/Berlin", false}, - new Object[] {"UTC", false}, - new Object[] {"Asia/Shanghai", true}, - new Object[] {"Europe/Berlin", true}, - new Object[] {"UTC", true} - }; + List parameterTuples = new ArrayList<>(); + for (String mongoVersion : MONGO_VERSIONS) { + for (String timezone : new String[] {"Asia/Shanghai", "Europe/Berlin", "UTC"}) { + for (boolean parallelismSnapshot : new boolean[] {true, false}) { + parameterTuples.add(new Object[] {mongoVersion, timezone, parallelismSnapshot}); + } + } + } + return parameterTuples.toArray(); } @Before @@ -87,7 +91,7 @@ public void before() { public void testTemporalTypesWithTimeZone() throws Exception { tEnv.getConfig().setLocalTimeZone(ZoneId.of(localTimeZone)); - String database = CONTAINER.executeCommandFileInSeparateDatabase("column_type_test"); + String database = mongoContainer.executeCommandFileInSeparateDatabase("column_type_test"); String sourceDDL = String.format( @@ -108,7 +112,7 @@ public void testTemporalTypesWithTimeZone() throws Exception { + " 'database' = '%s'," + " 'collection' = '%s'" + ")", - CONTAINER.getHostAndPort(), + mongoContainer.getHostAndPort(), FLINK_USER, FLINK_USER_PASSWORD, database, @@ -160,7 +164,7 @@ public void testTemporalTypesWithTimeZone() throws Exception { public void testDateAndTimestampToStringWithTimeZone() throws Exception { tEnv.getConfig().setLocalTimeZone(ZoneId.of(localTimeZone)); - String database = CONTAINER.executeCommandFileInSeparateDatabase("column_type_test"); + String database = mongoContainer.executeCommandFileInSeparateDatabase("column_type_test"); String sourceDDL = String.format( @@ -177,7 +181,7 @@ public void testDateAndTimestampToStringWithTimeZone() throws Exception { + " 'database' = '%s'," + " 'collection' = '%s'" + ")", - CONTAINER.getHostAndPort(), + mongoContainer.getHostAndPort(), FLINK_USER, FLINK_USER_PASSWORD, database, diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java index f3db2624d06..e9cf648f91b 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java @@ -66,22 +66,30 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment { private MongoClient mongoClient; + public static final String[] MONGO_VERSIONS = {"6.0.16", "7.0.12"}; + @Parameterized.Parameter(1) - public boolean parallelismSnapshot; + public String mongoVersion; @Parameterized.Parameter(2) + public boolean parallelismSnapshot; + + @Parameterized.Parameter(3) public boolean scanFullChangelog; @Parameterized.Parameters( - name = "flinkVersion: {0}, parallelismSnapshot: {1}, scanFullChangelog: {2}") + name = + "flinkVersion: {0}, mongoVersion: {1}, parallelismSnapshot: {2}, scanFullChangelog: {3}") public static List parameters() { final List flinkVersions = getFlinkVersion(); List params = new ArrayList<>(); for (String flinkVersion : flinkVersions) { - params.add(new Object[] {flinkVersion, true, true}); - params.add(new Object[] {flinkVersion, true, false}); - params.add(new Object[] {flinkVersion, false, true}); - params.add(new Object[] {flinkVersion, false, false}); + for (String mongoVersion : MONGO_VERSIONS) { + params.add(new Object[] {flinkVersion, mongoVersion, true, true}); + params.add(new Object[] {flinkVersion, mongoVersion, true, false}); + params.add(new Object[] {flinkVersion, mongoVersion, false, true}); + params.add(new Object[] {flinkVersion, mongoVersion, false, false}); + } } return params; }