diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java index 8c9545e23f..55cb1f8e8a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java @@ -159,12 +159,12 @@ private MySqlConnection getConnection() { public void testScanBinlogNewlyAddedTableEnabled() throws Exception { List tables = Collections.singletonList("address_\\.*"); Map options = new HashMap<>(); - options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); options.put(SCAN_STARTUP_MODE.key(), "timestamp"); options.put( SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis())); - FlinkSourceProvider sourceProvider = getFlinkSourceProvider(tables, 4, options); + FlinkSourceProvider sourceProvider = + getFlinkSourceProvider(tables, 4, options, false, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); env.enableCheckpointing(200); @@ -193,13 +193,13 @@ public void testScanBinlogNewlyAddedTableEnabled() throws Exception { public void testScanBinlogNewlyAddedTableEnabledAndExcludeTables() throws Exception { List tables = Collections.singletonList("address_\\.*"); Map options = new HashMap<>(); - options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); options.put(TABLES_EXCLUDE.key(), customDatabase.getDatabaseName() + ".address_beijing"); options.put(SCAN_STARTUP_MODE.key(), "timestamp"); options.put( SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis())); - FlinkSourceProvider sourceProvider = getFlinkSourceProvider(tables, 4, options); + FlinkSourceProvider sourceProvider = + getFlinkSourceProvider(tables, 4, options, false, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); env.enableCheckpointing(200); @@ -304,7 +304,8 @@ private void testAddNewTable(TestParam testParam, int parallelism) throws Except List listenTablesFirstRound = testParam.getFirstRoundListenTables(); FlinkSourceProvider sourceProvider = - getFlinkSourceProvider(listenTablesFirstRound, parallelism, new HashMap<>()); + getFlinkSourceProvider( + listenTablesFirstRound, parallelism, new HashMap<>(), true, false); DataStreamSource source = env.fromSource( sourceProvider.getSource(), @@ -348,7 +349,8 @@ private void testAddNewTable(TestParam testParam, int parallelism) throws Except getStreamExecutionEnvironment(finishedSavePointPath, parallelism); List listenTablesSecondRound = testParam.getSecondRoundListenTables(); FlinkSourceProvider restoredSourceProvider = - getFlinkSourceProvider(listenTablesSecondRound, parallelism, new HashMap<>()); + getFlinkSourceProvider( + listenTablesSecondRound, parallelism, new HashMap<>(), true, false); DataStreamSource restoreSource = restoredEnv.fromSource( restoredSourceProvider.getSource(), @@ -509,7 +511,11 @@ private void initialAddressTables(JdbcConnection connection, List addres } private FlinkSourceProvider getFlinkSourceProvider( - List tables, int parallelism, Map additionalOptions) { + List tables, + int parallelism, + Map additionalOptions, + boolean enableScanNewlyAddedTable, + boolean enableBinlogScanNewlyAddedTable) { List fullTableNames = tables.stream() .map(table -> customDatabase.getDatabaseName() + "." + table) @@ -522,7 +528,12 @@ private FlinkSourceProvider getFlinkSourceProvider( options.put(SERVER_TIME_ZONE.key(), "UTC"); options.put(TABLES.key(), StringUtils.join(fullTableNames, ",")); options.put(SERVER_ID.key(), getServerId(parallelism)); - options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); + if (enableScanNewlyAddedTable) { + options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); + } + if (enableBinlogScanNewlyAddedTable) { + options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); + } options.putAll(additionalOptions); Factory.Context context = new FactoryHelper.DefaultContext(