Skip to content

Commit

Permalink
Fix test.
Browse files Browse the repository at this point in the history
  • Loading branch information
loserwang1024 authored and leonardBang committed Aug 26, 2024
1 parent cfe242f commit 8f36206
Showing 1 changed file with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ private MySqlConnection getConnection() {
public void testScanBinlogNewlyAddedTableEnabled() throws Exception {
List<String> tables = Collections.singletonList("address_\\.*");
Map<String, String> options = new HashMap<>();
options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
options.put(SCAN_STARTUP_MODE.key(), "timestamp");
options.put(
SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis()));

FlinkSourceProvider sourceProvider = getFlinkSourceProvider(tables, 4, options);
FlinkSourceProvider sourceProvider =
getFlinkSourceProvider(tables, 4, options, false, true);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
env.enableCheckpointing(200);
Expand Down Expand Up @@ -193,13 +193,13 @@ public void testScanBinlogNewlyAddedTableEnabled() throws Exception {
public void testScanBinlogNewlyAddedTableEnabledAndExcludeTables() throws Exception {
List<String> tables = Collections.singletonList("address_\\.*");
Map<String, String> options = new HashMap<>();
options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
options.put(TABLES_EXCLUDE.key(), customDatabase.getDatabaseName() + ".address_beijing");
options.put(SCAN_STARTUP_MODE.key(), "timestamp");
options.put(
SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis()));

FlinkSourceProvider sourceProvider = getFlinkSourceProvider(tables, 4, options);
FlinkSourceProvider sourceProvider =
getFlinkSourceProvider(tables, 4, options, false, true);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
env.enableCheckpointing(200);
Expand Down Expand Up @@ -304,7 +304,8 @@ private void testAddNewTable(TestParam testParam, int parallelism) throws Except
List<String> listenTablesFirstRound = testParam.getFirstRoundListenTables();

FlinkSourceProvider sourceProvider =
getFlinkSourceProvider(listenTablesFirstRound, parallelism, new HashMap<>());
getFlinkSourceProvider(
listenTablesFirstRound, parallelism, new HashMap<>(), true, false);
DataStreamSource<Event> source =
env.fromSource(
sourceProvider.getSource(),
Expand Down Expand Up @@ -348,7 +349,8 @@ private void testAddNewTable(TestParam testParam, int parallelism) throws Except
getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
List<String> listenTablesSecondRound = testParam.getSecondRoundListenTables();
FlinkSourceProvider restoredSourceProvider =
getFlinkSourceProvider(listenTablesSecondRound, parallelism, new HashMap<>());
getFlinkSourceProvider(
listenTablesSecondRound, parallelism, new HashMap<>(), true, false);
DataStreamSource<Event> restoreSource =
restoredEnv.fromSource(
restoredSourceProvider.getSource(),
Expand Down Expand Up @@ -509,7 +511,11 @@ private void initialAddressTables(JdbcConnection connection, List<String> addres
}

private FlinkSourceProvider getFlinkSourceProvider(
List<String> tables, int parallelism, Map<String, String> additionalOptions) {
List<String> tables,
int parallelism,
Map<String, String> additionalOptions,
boolean enableScanNewlyAddedTable,
boolean enableBinlogScanNewlyAddedTable) {
List<String> fullTableNames =
tables.stream()
.map(table -> customDatabase.getDatabaseName() + "." + table)
Expand All @@ -522,7 +528,12 @@ private FlinkSourceProvider getFlinkSourceProvider(
options.put(SERVER_TIME_ZONE.key(), "UTC");
options.put(TABLES.key(), StringUtils.join(fullTableNames, ","));
options.put(SERVER_ID.key(), getServerId(parallelism));
options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
if (enableScanNewlyAddedTable) {
options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
}
if (enableBinlogScanNewlyAddedTable) {
options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
}
options.putAll(additionalOptions);
Factory.Context context =
new FactoryHelper.DefaultContext(
Expand Down

0 comments on commit 8f36206

Please sign in to comment.