Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 #3489

Merged
merged 5 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,28 +78,36 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {

@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);

private final String mongoVersion;
private final boolean parallelismSnapshot;

public MongoDBFullChangelogITCase(boolean parallelismSnapshot) {
public MongoDBFullChangelogITCase(String mongoVersion, boolean parallelismSnapshot) {
super(mongoVersion);
this.mongoVersion = mongoVersion;
this.parallelismSnapshot = parallelismSnapshot;
}

@Parameterized.Parameters(name = "parallelismSnapshot: {0}")
@Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: {1}")
public static Object[] parameters() {
return new Object[][] {new Object[] {false}, new Object[] {true}};
List<Object[]> parameterTuples = new ArrayList<>();
for (String mongoVersion : MONGO_VERSIONS) {
parameterTuples.add(new Object[] {mongoVersion, true});
parameterTuples.add(new Object[] {mongoVersion, false});
}
return parameterTuples.toArray();
}

@Test
public void testGetMongoDBVersion() {
MongoDBSourceConfig config =
new MongoDBSourceConfigFactory()
.hosts(CONTAINER.getHostAndPort())
.hosts(mongoContainer.getHostAndPort())
.splitSizeMB(1)
.samplesPerChunk(10)
.pollAwaitTimeMillis(500)
.create(0);

assertEquals(MongoUtils.getMongoVersion(config), "6.0.9");
assertEquals(MongoUtils.getMongoVersion(config), mongoVersion);
}

@Test
Expand Down Expand Up @@ -499,16 +507,16 @@ private List<String> testBackfillWhenWritingEvents(
"customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);

// A - enable system-level fulldoc pre & post image feature
CONTAINER.executeCommand(
mongoContainer.executeCommand(
"use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");

// B - enable collection-level fulldoc pre & post image for change capture collection
CONTAINER.executeCommandInDatabase(
mongoContainer.executeCommandInDatabase(
String.format(
"db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
"customers", "customers"),
customerDatabase);
CONTAINER.executeCommandFileInDatabase("customer", customerDatabase);
mongoContainer.executeCommandFileInDatabase("customer", customerDatabase);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
Expand All @@ -526,7 +534,7 @@ private List<String> testBackfillWhenWritingEvents(
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema);
MongoDBSource source =
new MongoDBSourceBuilder()
.hosts(CONTAINER.getHostAndPort())
.hosts(mongoContainer.getHostAndPort())
.databaseList(customerDatabase)
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
Expand Down Expand Up @@ -613,12 +621,12 @@ private void testMongoDBParallelSource(
"customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);

// A - enable system-level fulldoc pre & post image feature
CONTAINER.executeCommand(
mongoContainer.executeCommand(
"use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");

// B - enable collection-level fulldoc pre & post image for change capture collection
for (String collectionName : captureCustomerCollections) {
CONTAINER.executeCommandInDatabase(
mongoContainer.executeCommandInDatabase(
String.format(
"db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
collectionName, collectionName),
Expand Down Expand Up @@ -654,14 +662,14 @@ private void testMongoDBParallelSource(
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ ")",
parallelismSnapshot ? "true" : "false",
CONTAINER.getHostAndPort(),
mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
customerDatabase,
getCollectionNameRegex(customerDatabase, captureCustomerCollections),
skipSnapshotBackfill);

CONTAINER.executeCommandFileInDatabase("customer", customerDatabase);
mongoContainer.executeCommandFileInDatabase("customer", customerDatabase);

// first step: check the snapshot data
String[] snapshotForSingleTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,42 @@

import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH;

/** Example Tests for {@link MongoDBSource}. */
@RunWith(Parameterized.class)
public class MongoDBParallelSourceExampleTest extends MongoDBSourceTestBase {

@Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: {1}")
public static Object[] parameters() {
List<Object[]> parameterTuples = new ArrayList<>();
for (String mongoVersion : MONGO_VERSIONS) {
parameterTuples.add(new Object[] {mongoVersion, true});
parameterTuples.add(new Object[] {mongoVersion, false});
}
return parameterTuples.toArray();
}

public MongoDBParallelSourceExampleTest(String mongoVersion) {
super(mongoVersion);
}

@Test
@Ignore("Test ignored because it won't stop and is used for manual test")
public void testMongoDBExampleSource() throws Exception {
String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory");
String database = mongoContainer.executeCommandFileInSeparateDatabase("inventory");

MongoDBSource<String> mongoSource =
MongoDBSource.<String>builder()
.hosts(CONTAINER.getHostAndPort())
.hosts(mongoContainer.getHostAndPort())
.databaseList(database)
.collectionList(database + ".products")
.username(FLINK_USER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
Expand All @@ -65,11 +68,21 @@
import static org.apache.flink.util.Preconditions.checkState;

/** IT tests for {@link MongoDBSource}. */
@RunWith(Parameterized.class)
public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;

public MongoDBParallelSourceITCase(String mongoVersion) {
super(mongoVersion);
}

@Parameterized.Parameters(name = "mongoVersion: {0}")
public static Object[] parameters() {
return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
}

@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);

@Test
Expand Down Expand Up @@ -406,7 +419,7 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
private List<String> testBackfillWhenWritingEvents(
boolean skipBackFill, int fetchSize, int hookType, StartupOptions startupOptions)
throws Exception {
String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer");
String customerDatabase = mongoContainer.executeCommandFileInSeparateDatabase("customer");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setParallelism(1);
Expand All @@ -423,7 +436,7 @@ private List<String> testBackfillWhenWritingEvents(
TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema);
MongoDBSource source =
new MongoDBSourceBuilder()
.hosts(CONTAINER.getHostAndPort())
.hosts(mongoContainer.getHostAndPort())
.databaseList(customerDatabase)
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
Expand Down Expand Up @@ -507,7 +520,7 @@ private void testMongoDBParallelSource(
boolean skipSnapshotBackfill)
throws Exception {

String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer");
String customerDatabase = mongoContainer.executeCommandFileInSeparateDatabase("customer");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Expand Down Expand Up @@ -535,7 +548,7 @@ private void testMongoDBParallelSource(
+ " 'heartbeat.interval.ms' = '500',"
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ ")",
CONTAINER.getHostAndPort(),
mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
customerDatabase,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Before;
import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,10 +38,21 @@
/** MongoDBSourceTestBase for MongoDB >= 5.0.3. */
public class MongoDBSourceTestBase {

protected static MongoClient mongodbClient;
public MongoDBSourceTestBase(String mongoVersion) {
this.mongoContainer =
new MongoDBContainer("mongo:" + mongoVersion)
.withSharding()
.withLogConsumer(new Slf4jLogConsumer(LOG));
}

public static final String[] MONGO_VERSIONS = {"6.0.16", "7.0.12"};

protected static final int DEFAULT_PARALLELISM = 4;

@Rule public final MongoDBContainer mongoContainer;

protected MongoClient mongodbClient;

@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
Expand All @@ -53,26 +63,20 @@ public class MongoDBSourceTestBase {
.withHaLeadershipControl()
.build());

@BeforeClass
public static void startContainers() {
@Before
public void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(CONTAINER)).join();
Startables.deepStart(Stream.of(mongoContainer)).join();

MongoClientSettings settings =
MongoClientSettings.builder()
.applyConnectionString(
new ConnectionString(CONTAINER.getConnectionString()))
new ConnectionString(mongoContainer.getConnectionString()))
.build();
mongodbClient = MongoClients.create(settings);

LOG.info("Containers are started.");
}

private static final Logger LOG = LoggerFactory.getLogger(MongoDBSourceTestBase.class);

@ClassRule
public static final MongoDBContainer CONTAINER =
new MongoDBContainer("mongo:6.0.9")
.withSharding()
.withLogConsumer(new Slf4jLogConsumer(LOG));
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.sql.SQLException;
import java.util.ArrayList;
Expand All @@ -55,20 +57,31 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.lang.String.format;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static org.apache.flink.util.Preconditions.checkState;

/** IT tests to cover various newly added collections during capture process. */
@RunWith(Parameterized.class)
public class NewlyAddedTableITCase extends MongoDBSourceTestBase {

@Rule public final Timeout timeoutPerTest = Timeout.seconds(500);

private String customerDatabase;
protected static final int DEFAULT_PARALLELISM = 4;

public NewlyAddedTableITCase(String mongoVersion) {
super(mongoVersion);
}

@Parameterized.Parameters(name = "mongoVersion: {0}")
public static Object[] parameters() {
return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
}

private final ScheduledExecutorService mockChangelogExecutor =
Executors.newScheduledThreadPool(1);

Expand All @@ -79,7 +92,7 @@ public void before() throws SQLException {
// prepare initial data for given collection
String collectionName = "produce_changelog";
// enable system-level fulldoc pre & post image feature
CONTAINER.executeCommand(
mongoContainer.executeCommand(
"use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");

// mock continuous changelog during the newly added collections capturing process
Expand Down Expand Up @@ -846,7 +859,7 @@ private void initialAddressCollections(
// make initial data for given collection.
String cityName = collectionName.split("_")[1];
// B - enable collection-level fulldoc pre & post image for change capture collection
CONTAINER.executeCommandInDatabase(
mongoContainer.executeCommandInDatabase(
String.format(
"db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
collectionName, collectionName),
Expand Down Expand Up @@ -983,7 +996,7 @@ private String getCreateTableStatement(
+ " 'scan.newly-added-table.enabled' = 'true'"
+ " %s"
+ ")",
CONTAINER.getHostAndPort(),
mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
customerDatabase,
Expand Down
Loading
Loading