Skip to content

Commit

Permalink
[FLINK-35868][cdc-connector][mongodb] Bump dependency version to supp…
Browse files Browse the repository at this point in the history
…ort MongoDB 7.0

This closes apache#3489.
  • Loading branch information
yuxiqian authored and wuzhenhua01 committed Aug 4, 2024
1 parent 020a0ad commit af78d63
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 89 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/connectors/flink-sources/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ You can also read [tutorials]({{< ref "docs/connectors/flink-sources/tutorials/b

| Connector | Database | Driver |
|----------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------|
| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) | <li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.4 |
| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) | <li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0, 6.0, 6.1, 7.0 | MongoDB Driver: 4.11.2 |
| [mysql-cdc]({{< ref "docs/connectors/flink-sources/mysql-cdc" >}}) | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.28 |
| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | <li> [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x <li> [OceanBase EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x | OceanBase Driver: 2.4.x |
| [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) | <li> [Oracle](https://www.oracle.com/index.html): 11, 12, 19, 21 | Oracle Driver: 19.3.0.0 |
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/flink-sources/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ You can also read [tutorials]({{< ref "docs/connectors/flink-sources/tutorials/b

| Connector | Database | Driver |
|----------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------|
| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) | <li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.4 |
| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) | <li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0, 6.0, 6.1, 7.0 | MongoDB Driver: 4.11.2 |
| [mysql-cdc]({{< ref "docs/connectors/flink-sources/mysql-cdc" >}}) | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.28 |
| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | <li> [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x <li> [OceanBase EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x | OceanBase Driver: 2.4.x |
| [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) | <li> [Oracle](https://www.oracle.com/index.html): 11, 12, 19, 21 | Oracle Driver: 19.3.0.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ limitations under the License.
<dependency>
<groupId>org.mongodb.kafka</groupId>
<artifactId>mongo-kafka-connect</artifactId>
<version>1.10.1</version>
<version>1.13.0</version>
<exclusions>
<exclusion>
<artifactId>mongodb-driver-sync</artifactId>
Expand All @@ -69,7 +69,7 @@ limitations under the License.
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.9.1</version>
<version>4.11.2</version>
</dependency>

<!-- test dependencies on Flink -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,28 +78,36 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {

@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);

private final String mongoVersion;
private final boolean parallelismSnapshot;

public MongoDBFullChangelogITCase(boolean parallelismSnapshot) {
public MongoDBFullChangelogITCase(String mongoVersion, boolean parallelismSnapshot) {
super(mongoVersion);
this.mongoVersion = mongoVersion;
this.parallelismSnapshot = parallelismSnapshot;
}

@Parameterized.Parameters(name = "parallelismSnapshot: {0}")
@Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: {1}")
public static Object[] parameters() {
return new Object[][] {new Object[] {false}, new Object[] {true}};
List<Object[]> parameterTuples = new ArrayList<>();
for (String mongoVersion : MONGO_VERSIONS) {
parameterTuples.add(new Object[] {mongoVersion, true});
parameterTuples.add(new Object[] {mongoVersion, false});
}
return parameterTuples.toArray();
}

@Test
public void testGetMongoDBVersion() {
MongoDBSourceConfig config =
new MongoDBSourceConfigFactory()
.hosts(CONTAINER.getHostAndPort())
.hosts(mongoContainer.getHostAndPort())
.splitSizeMB(1)
.samplesPerChunk(10)
.pollAwaitTimeMillis(500)
.create(0);

assertEquals(MongoUtils.getMongoVersion(config), "6.0.9");
assertEquals(MongoUtils.getMongoVersion(config), mongoVersion);
}

@Test
Expand Down Expand Up @@ -499,16 +507,16 @@ private List<String> testBackfillWhenWritingEvents(
"customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);

// A - enable system-level fulldoc pre & post image feature
CONTAINER.executeCommand(
mongoContainer.executeCommand(
"use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");

// B - enable collection-level fulldoc pre & post image for change capture collection
CONTAINER.executeCommandInDatabase(
mongoContainer.executeCommandInDatabase(
String.format(
"db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
"customers", "customers"),
customerDatabase);
CONTAINER.executeCommandFileInDatabase("customer", customerDatabase);
mongoContainer.executeCommandFileInDatabase("customer", customerDatabase);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
Expand All @@ -526,7 +534,7 @@ private List<String> testBackfillWhenWritingEvents(
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema);
MongoDBSource source =
new MongoDBSourceBuilder()
.hosts(CONTAINER.getHostAndPort())
.hosts(mongoContainer.getHostAndPort())
.databaseList(customerDatabase)
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
Expand Down Expand Up @@ -613,12 +621,12 @@ private void testMongoDBParallelSource(
"customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);

// A - enable system-level fulldoc pre & post image feature
CONTAINER.executeCommand(
mongoContainer.executeCommand(
"use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");

// B - enable collection-level fulldoc pre & post image for change capture collection
for (String collectionName : captureCustomerCollections) {
CONTAINER.executeCommandInDatabase(
mongoContainer.executeCommandInDatabase(
String.format(
"db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
collectionName, collectionName),
Expand Down Expand Up @@ -654,14 +662,14 @@ private void testMongoDBParallelSource(
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ ")",
parallelismSnapshot ? "true" : "false",
CONTAINER.getHostAndPort(),
mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
customerDatabase,
getCollectionNameRegex(customerDatabase, captureCustomerCollections),
skipSnapshotBackfill);

CONTAINER.executeCommandFileInDatabase("customer", customerDatabase);
mongoContainer.executeCommandFileInDatabase("customer", customerDatabase);

// first step: check the snapshot data
String[] snapshotForSingleTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,42 @@

import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH;

/** Example Tests for {@link MongoDBSource}. */
@RunWith(Parameterized.class)
public class MongoDBParallelSourceExampleTest extends MongoDBSourceTestBase {

@Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: {1}")
public static Object[] parameters() {
List<Object[]> parameterTuples = new ArrayList<>();
for (String mongoVersion : MONGO_VERSIONS) {
parameterTuples.add(new Object[] {mongoVersion, true});
parameterTuples.add(new Object[] {mongoVersion, false});
}
return parameterTuples.toArray();
}

public MongoDBParallelSourceExampleTest(String mongoVersion) {
super(mongoVersion);
}

@Test
@Ignore("Test ignored because it won't stop and is used for manual test")
public void testMongoDBExampleSource() throws Exception {
String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory");
String database = mongoContainer.executeCommandFileInSeparateDatabase("inventory");

MongoDBSource<String> mongoSource =
MongoDBSource.<String>builder()
.hosts(CONTAINER.getHostAndPort())
.hosts(mongoContainer.getHostAndPort())
.databaseList(database)
.collectionList(database + ".products")
.username(FLINK_USER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
Expand All @@ -65,11 +68,21 @@
import static org.apache.flink.util.Preconditions.checkState;

/** IT tests for {@link MongoDBSource}. */
@RunWith(Parameterized.class)
public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;

public MongoDBParallelSourceITCase(String mongoVersion) {
super(mongoVersion);
}

@Parameterized.Parameters(name = "mongoVersion: {0}")
public static Object[] parameters() {
return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
}

@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);

@Test
Expand Down Expand Up @@ -406,7 +419,7 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
private List<String> testBackfillWhenWritingEvents(
boolean skipBackFill, int fetchSize, int hookType, StartupOptions startupOptions)
throws Exception {
String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer");
String customerDatabase = mongoContainer.executeCommandFileInSeparateDatabase("customer");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setParallelism(1);
Expand All @@ -423,7 +436,7 @@ private List<String> testBackfillWhenWritingEvents(
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema);
MongoDBSource source =
new MongoDBSourceBuilder()
.hosts(CONTAINER.getHostAndPort())
.hosts(mongoContainer.getHostAndPort())
.databaseList(customerDatabase)
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
Expand Down Expand Up @@ -507,7 +520,7 @@ private void testMongoDBParallelSource(
boolean skipSnapshotBackfill)
throws Exception {

String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer");
String customerDatabase = mongoContainer.executeCommandFileInSeparateDatabase("customer");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Expand Down Expand Up @@ -535,7 +548,7 @@ private void testMongoDBParallelSource(
+ " 'heartbeat.interval.ms' = '500',"
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ ")",
CONTAINER.getHostAndPort(),
mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
customerDatabase,
Expand Down
Loading

0 comments on commit af78d63

Please sign in to comment.