Skip to content

Commit

Permalink
[hotfix][test] Reorganize test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian authored and GOODBOY008 committed Oct 15, 2024
1 parent a1781f4 commit daf27fa
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public MongoDBFullChangelogITCase(String mongoVersion, boolean parallelismSnapsh
@Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: {1}")
public static Object[] parameters() {
List<Object[]> parameterTuples = new ArrayList<>();
for (String mongoVersion : MONGO_VERSIONS) {
for (String mongoVersion : getMongoVersions()) {
parameterTuples.add(new Object[] {mongoVersion, true});
parameterTuples.add(new Object[] {mongoVersion, false});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class MongoDBParallelSourceExampleTest extends MongoDBSourceTestBase {
@Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: {1}")
public static Object[] parameters() {
List<Object[]> parameterTuples = new ArrayList<>();
for (String mongoVersion : MONGO_VERSIONS) {
for (String mongoVersion : getMongoVersions()) {
parameterTuples.add(new Object[] {mongoVersion, true});
parameterTuples.add(new Object[] {mongoVersion, false});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public MongoDBParallelSourceITCase(String mongoVersion) {

@Parameterized.Parameters(name = "mongoVersion: {0}")
public static Object[] parameters() {
return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
return Stream.of(getMongoVersions()).map(e -> new Object[] {e}).toArray();
}

@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ public MongoDBSourceTestBase(String mongoVersion) {
.withLogConsumer(new Slf4jLogConsumer(LOG));
}

public static final String[] MONGO_VERSIONS = {"6.0.16", "7.0.12"};
public static String[] getMongoVersions() {
String specifiedMongoVersion = System.getProperty("specifiedMongoVersion");
if (specifiedMongoVersion != null) {
return new String[] {specifiedMongoVersion};
} else {
return new String[] {"6.0.16", "7.0.12"};
}
}

protected static final int DEFAULT_PARALLELISM = 4;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public NewlyAddedTableITCase(String mongoVersion) {

@Parameterized.Parameters(name = "mongoVersion: {0}")
public static Object[] parameters() {
return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
return Stream.of(getMongoVersions()).map(e -> new Object[] {e}).toArray();
}

private final ScheduledExecutorService mockChangelogExecutor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public MongoDBSnapshotSplitReaderTest(String mongoVersion) {

@Parameterized.Parameters(name = "mongoVersion: {0}")
public static Object[] parameters() {
return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
return Stream.of(getMongoVersions()).map(e -> new Object[] {e}).toArray();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public MongoDBStreamSplitReaderTest(String mongoVersion) {

@Parameterized.Parameters(name = "mongoVersion: {0}")
public static Object[] parameters() {
return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray();
return Stream.of(getMongoVersions()).map(e -> new Object[] {e}).toArray();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public MongoDBTimeZoneITCase(
name = "mongoVersion: {0}, localTimeZone: {1}, parallelismSnapshot: {2}")
public static Object[] parameters() {
List<Object[]> parameterTuples = new ArrayList<>();
for (String mongoVersion : MONGO_VERSIONS) {
for (String mongoVersion : getMongoVersions()) {
for (String timezone : new String[] {"Asia/Shanghai", "Europe/Berlin", "UTC"}) {
for (boolean parallelismSnapshot : new boolean[] {true, false}) {
parameterTuples.add(new Object[] {mongoVersion, timezone, parallelismSnapshot});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -90,7 +91,12 @@ public abstract class PipelineTestEnvironment extends TestLogger {

@Parameterized.Parameters(name = "flinkVersion: {0}")
public static List<String> getFlinkVersion() {
return Arrays.asList("1.17.2", "1.18.1", "1.19.1", "1.20.0");
String flinkVersion = System.getProperty("specifiedFlinkVersion");
if (flinkVersion != null) {
return Collections.singletonList(flinkVersion);
} else {
return Arrays.asList("1.17.2", "1.18.1", "1.19.1", "1.20.0");
}
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
Expand All @@ -66,8 +67,6 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment {

private MongoClient mongoClient;

public static final String[] MONGO_VERSIONS = {"6.0.16", "7.0.12"};

@Parameterized.Parameter(1)
public String mongoVersion;

Expand All @@ -77,14 +76,23 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment {
@Parameterized.Parameter(3)
public boolean scanFullChangelog;

public static List<String> getMongoVersions() {
String specifiedMongoVersion = System.getProperty("specifiedMongoVersion");
if (specifiedMongoVersion != null) {
return Collections.singletonList(specifiedMongoVersion);
} else {
return Arrays.asList("6.0.16", "7.0.12");
}
}

@Parameterized.Parameters(
name =
"flinkVersion: {0}, mongoVersion: {1}, parallelismSnapshot: {2}, scanFullChangelog: {3}")
public static List<Object[]> parameters() {
final List<String> flinkVersions = getFlinkVersion();
List<Object[]> params = new ArrayList<>();
for (String flinkVersion : flinkVersions) {
for (String mongoVersion : MONGO_VERSIONS) {
for (String mongoVersion : getMongoVersions()) {
params.add(new Object[] {flinkVersion, mongoVersion, true, true});
params.add(new Object[] {flinkVersion, mongoVersion, true, false});
params.add(new Object[] {flinkVersion, mongoVersion, false, true});
Expand All @@ -99,7 +107,7 @@ public void before() {
super.before();

container =
new MongoDBContainer("mongo:6.0.9")
new MongoDBContainer("mongo:" + mongoVersion)
.withSharding()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_MONGO_ALIAS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -120,7 +121,12 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {

@Parameterized.Parameters(name = "flinkVersion: {0}")
public static List<String> getFlinkVersion() {
return Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.1", "1.20.0");
String flinkVersion = System.getProperty("specifiedFlinkVersion");
if (flinkVersion != null) {
return Collections.singletonList(flinkVersion);
} else {
return Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.1", "1.20.0");
}
}

@Before
Expand Down
6 changes: 5 additions & 1 deletion tools/mig-test/misc/patch_flink_conf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
execution.checkpointing.interval: 300
EXTRACONF

File.write("#{FLINK_HOME}/conf/flink-conf.yaml", EXTRA_CONF, mode: 'a+')
if File.file?("#{FLINK_HOME}/conf/flink-conf.yaml")
File.write("#{FLINK_HOME}/conf/flink-conf.yaml", EXTRA_CONF, mode: 'a+')
else
File.write("#{FLINK_HOME}/conf/config.yaml", EXTRA_CONF, mode: 'a+')
end

# MySQL connector is not provided
`wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar -O #{FLINK_HOME}/lib/mysql-connector-java-8.0.27.jar`
10 changes: 8 additions & 2 deletions tools/mig-test/run_migration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,13 @@ def test_migration(from_version, to_version)
end
end

version_list = %w[3.0.0 3.0.1 3.1.0 3.1.1 3.3-SNAPSHOT]
version_list = case ARGV[0]
when '1.18.1' then %w[3.0.0 3.0.1 3.1.1 3.3-SNAPSHOT]
when '1.19.1' then %w[3.1.1 3.3-SNAPSHOT]
when '1.20.0' then %w[3.3-SNAPSHOT]
else []
end

no_savepoint_versions = %w[3.0.0 3.0.1]
version_result = Hash.new('❓')
@failures = []
Expand Down Expand Up @@ -157,6 +163,6 @@ def test_migration(from_version, to_version)
end
puts "✅ - Compatible, ❌ - Not compatible, ❓ - Target version doesn't support `--from-savepoint`"

if @failures.filter { |old_version, new_version| new_version == version_list.last && old_version != '3.1.0' }.any?
if @failures.any?
abort 'Some migration to snapshot version tests failed.'
end

0 comments on commit daf27fa

Please sign in to comment.