diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java index 9a740b15bc..0560ffb76b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java @@ -243,6 +243,12 @@ public OracleSourceBuilder skipSnapshotBackfill(boolean skipSnapshotBackfill) return this; } + /** Whether the {@link OracleIncrementalSource} should scan the newly added tables or not. */ + public OracleSourceBuilder scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { + this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); + return this; + } + /** * Build the {@link OracleIncrementalSource}. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java index 2c007fe548..19386290af 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java @@ -63,7 +63,8 @@ public OracleSourceConfig( int connectMaxRetries, int connectionPoolSize, String chunkKeyColumn, - boolean skipSnapshotBackfill) { + boolean skipSnapshotBackfill, + boolean scanNewlyAddedTableEnabled) { super( startupOptions, databaseList, @@ -89,7 +90,7 @@ public OracleSourceConfig( connectionPoolSize, chunkKeyColumn, skipSnapshotBackfill, - false); + scanNewlyAddedTableEnabled); this.url = url; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java index cf01b849eb..974cedf4ad 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java @@ -133,6 +133,7 @@ public OracleSourceConfig create(int subtaskId) { connectMaxRetries, connectionPoolSize, chunkKeyColumn, - skipSnapshotBackfill); + skipSnapshotBackfill, + scanNewlyAddedTableEnabled); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java index 9487a0968e..5903ce41e9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java @@ -79,6 +79,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada private final String chunkKeyColumn; private final boolean closeIdleReaders; private final boolean skipSnapshotBackfill; + private final boolean scanNewlyAddedTableEnabled; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -113,7 +114,8 @@ public OracleTableSource( double distributionFactorLower, @Nullable String chunkKeyColumn, boolean closeIdleReaders, - boolean skipSnapshotBackfill) { + boolean skipSnapshotBackfill, + boolean scanNewlyAddedTableEnabled) { this.physicalSchema = physicalSchema; this.url = url; this.port = port; @@ -139,6 +141,7 @@ public OracleTableSource( this.chunkKeyColumn = chunkKeyColumn; this.closeIdleReaders = closeIdleReaders; this.skipSnapshotBackfill = skipSnapshotBackfill; + this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; } @Override @@ -187,6 +190,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .closeIdleReaders(closeIdleReaders) .skipSnapshotBackfill(skipSnapshotBackfill) .chunkKeyColumn(chunkKeyColumn) + .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .build(); return SourceProvider.of(oracleChangeEventSource); @@ -252,7 +256,8 @@ public DynamicTableSource copy() { distributionFactorLower, chunkKeyColumn, closeIdleReaders, - skipSnapshotBackfill); + skipSnapshotBackfill, + scanNewlyAddedTableEnabled); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -291,7 +296,8 @@ public boolean equals(Object o) { && Objects.equals(distributionFactorLower, that.distributionFactorLower) && Objects.equals(chunkKeyColumn, that.chunkKeyColumn) && Objects.equals(closeIdleReaders, that.closeIdleReaders) - && Objects.equals(skipSnapshotBackfill, that.skipSnapshotBackfill); + && Objects.equals(skipSnapshotBackfill, that.skipSnapshotBackfill) + && Objects.equals(scanNewlyAddedTableEnabled, that.scanNewlyAddedTableEnabled); } @Override @@ -321,7 +327,8 @@ public int hashCode() { distributionFactorLower, chunkKeyColumn, closeIdleReaders, - skipSnapshotBackfill); + skipSnapshotBackfill, + scanNewlyAddedTableEnabled); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java index 5f533d8f2d..70a63b3aea 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java @@ -47,6 +47,7 @@ import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; @@ -106,6 +107,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); + boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); if (enableParallelRead) { validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); @@ -142,7 +144,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { distributionFactorLower, chunkKeyColumn, closeIdlerReaders, - skipSnapshotBackfill); + skipSnapshotBackfill, + scanNewlyAddedTableEnabled); } @Override @@ -180,6 +183,7 @@ public Set> optionalOptions() { options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); + options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED); return options; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java new file mode 100644 index 0000000000..0cbbb8a578 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java @@ -0,0 +1,908 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverPhase; +import org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverType; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.Timeout; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.getTableNameRegex; +import static org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.triggerFailover; +import static org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.waitForSinkSize; +import static org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.waitForUpsertSinkSize; + +/** IT tests to cover various newly added tables during capture process. */ +public class NewlyAddedTableITCase extends OracleSourceTestBase { + @Rule public final Timeout timeoutPerTest = Timeout.seconds(600); + + private final ScheduledExecutorService mockRedoLogExecutor = + Executors.newScheduledThreadPool(1); + + @BeforeClass + public static void beforeClass() throws SQLException { + try (Connection dbaConnection = getJdbcConnectionAsDBA(); + Statement dbaStatement = dbaConnection.createStatement()) { + dbaStatement.execute("ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); + } + } + + @Before + public void before() throws Exception { + TestValuesTableFactory.clearAllData(); + createAndInitialize("customer.sql"); + try (Connection connection = getJdbcConnection()) { + Statement statement = connection.createStatement(); + connection.setAutoCommit(false); + // prepare initial data for given table + String tableId = ORACLE_SCHEMA + ".PRODUCE_LOG_TABLE"; + statement.execute( + format( + "CREATE TABLE %s ( ID NUMBER(19), CNT NUMBER(19), PRIMARY KEY(ID))", + tableId)); + statement.execute(format("INSERT INTO %s VALUES (0, 100)", tableId)); + statement.execute(format("INSERT INTO %s VALUES (1, 101)", tableId)); + statement.execute(format("INSERT INTO %s VALUES (2, 102)", tableId)); + connection.commit(); + + // mock continuous redo log during the newly added table capturing process + mockRedoLogExecutor.schedule( + () -> { + try { + executeSql(format("UPDATE %s SET CNT = CNT +1 WHERE ID < 2", tableId)); + } catch (Exception e) { + e.printStackTrace(); + } + }, + 500, + TimeUnit.MICROSECONDS); + } + } + + @After + public void after() throws Exception { + mockRedoLogExecutor.shutdown(); + // sleep 1000ms to wait until connections are closed. + Thread.sleep(1000L); + } + + @Test + public void testNewlyAddedTableForExistsPipelineOnce() throws Exception { + testNewlyAddedTableOneByOne( + 1, + FailoverType.NONE, + FailoverPhase.NEVER, + false, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING"); + } + + @Test + public void testNewlyAddedTableForExistsPipelineOnceWithAheadRedoLog() throws Exception { + testNewlyAddedTableOneByOne( + 1, + FailoverType.NONE, + FailoverPhase.NEVER, + true, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING"); + } + + @Test + public void testNewlyAddedTableForExistsPipelineTwice() throws Exception { + testNewlyAddedTableOneByOne( + DEFAULT_PARALLELISM, + FailoverType.NONE, + FailoverPhase.NEVER, + false, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING", + "ADDRESS_SHANGHAI"); + } + + @Test + public void testNewlyAddedTableForExistsPipelineTwiceWithAheadRedoLog() throws Exception { + testNewlyAddedTableOneByOne( + DEFAULT_PARALLELISM, + FailoverType.NONE, + FailoverPhase.NEVER, + true, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING", + "ADDRESS_SHANGHAI"); + } + + @Test + public void testNewlyAddedTableForExistsPipelineTwiceWithAheadRedoLogAndAutoCloseReader() + throws Exception { + Map otherOptions = new HashMap<>(); + otherOptions.put("scan.incremental.close-idle-reader.enabled", "true"); + testNewlyAddedTableOneByOne( + DEFAULT_PARALLELISM, + otherOptions, + FailoverType.NONE, + FailoverPhase.NEVER, + true, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING", + "ADDRESS_SHANGHAI"); + } + + @Test + public void testNewlyAddedTableForExistsPipelineThrice() throws Exception { + testNewlyAddedTableOneByOne( + DEFAULT_PARALLELISM, + FailoverType.NONE, + FailoverPhase.NEVER, + false, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING", + "ADDRESS_SHANGHAI", + "ADDRESS_SHENZHEN"); + } + + @Test + public void testNewlyAddedTableForExistsPipelineThriceWithAheadRedoLog() throws Exception { + testNewlyAddedTableOneByOne( + DEFAULT_PARALLELISM, + FailoverType.NONE, + FailoverPhase.NEVER, + true, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING", + "ADDRESS_SHANGHAI", + "ADDRESS_SHENZHEN"); + } + + @Test + public void testNewlyAddedTableForExistsPipelineSingleParallelism() throws Exception { + testNewlyAddedTableOneByOne( + 1, + FailoverType.NONE, + FailoverPhase.NEVER, + false, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING"); + } + + @Test + public void testNewlyAddedTableForExistsPipelineSingleParallelismWithAheadRedoLog() + throws Exception { + testNewlyAddedTableOneByOne( + 1, + FailoverType.NONE, + FailoverPhase.NEVER, + true, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING"); + } + + @Test + public void testJobManagerFailoverForNewlyAddedTable() throws Exception { + testNewlyAddedTableOneByOne( + DEFAULT_PARALLELISM, + FailoverType.JM, + FailoverPhase.SNAPSHOT, + false, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING"); + } + + @Test + public void testJobManagerFailoverForNewlyAddedTableWithAheadRedoLog() throws Exception { + testNewlyAddedTableOneByOne( + DEFAULT_PARALLELISM, + FailoverType.JM, + FailoverPhase.SNAPSHOT, + true, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING"); + } + + @Test + public void testTaskManagerFailoverForNewlyAddedTable() throws Exception { + testNewlyAddedTableOneByOne( + 1, + FailoverType.TM, + FailoverPhase.REDO_LOG, + false, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING"); + } + + @Test + public void testTaskManagerFailoverForNewlyAddedTableWithAheadRedoLog() throws Exception { + testNewlyAddedTableOneByOne( + 1, + FailoverType.TM, + FailoverPhase.REDO_LOG, + false, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING"); + } + + @Test + public void testJobManagerFailoverForRemoveTableSingleParallelism() throws Exception { + testRemoveTablesOneByOne( + 1, + FailoverType.JM, + FailoverPhase.SNAPSHOT, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING", + "ADDRESS_SHANGHAI"); + } + + @Test + public void testJobManagerFailoverForRemoveTable() throws Exception { + testRemoveTablesOneByOne( + DEFAULT_PARALLELISM, + FailoverType.JM, + FailoverPhase.SNAPSHOT, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING", + "ADDRESS_SHANGHAI"); + } + + @Test + public void testTaskManagerFailoverForRemoveTableSingleParallelism() throws Exception { + testRemoveTablesOneByOne( + 1, + FailoverType.TM, + FailoverPhase.SNAPSHOT, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING", + "ADDRESS_SHANGHAI"); + } + + @Test + public void testTaskManagerFailoverForRemoveTable() throws Exception { + testRemoveTablesOneByOne( + DEFAULT_PARALLELISM, + FailoverType.TM, + FailoverPhase.SNAPSHOT, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING", + "ADDRESS_SHANGHAI"); + } + + @Test + public void testRemoveTableSingleParallelism() throws Exception { + testRemoveTablesOneByOne( + 1, + FailoverType.NONE, + FailoverPhase.NEVER, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING", + "ADDRESS_SHANGHAI"); + } + + @Test + public void testRemoveTable() throws Exception { + testRemoveTablesOneByOne( + DEFAULT_PARALLELISM, + FailoverType.NONE, + FailoverPhase.NEVER, + "ADDRESS_HANGZHOU", + "ADDRESS_BEIJING", + "ADDRESS_SHANGHAI"); + } + + @Test + public void testRemoveAndAddTablesOneByOne() throws Exception { + testRemoveAndAddTablesOneByOne( + 1, "ADDRESS_HANGZHOU", "ADDRESS_BEIJING", "ADDRESS_SHANGHAI"); + } + + private void testRemoveAndAddTablesOneByOne(int parallelism, String... captureAddressTables) + throws Exception { + + Connection connection = getJdbcConnection(); + // step 1: create tables with all tables included + initialAddressTables(connection, captureAddressTables); + + final TemporaryFolder temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + final String savepointDirectory = temporaryFolder.newFolder().toURI().toString(); + + // get all expected data + List fetchedDataList = new ArrayList<>(); + + String finishedSavePointPath = null; + // test removing and adding table one by one + for (int round = 0; round < captureAddressTables.length; round++) { + String captureTableThisRound = captureAddressTables[round]; + String cityName = captureTableThisRound.split("_")[1]; + StreamExecutionEnvironment env = + getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String createTableStatement = + getCreateTableStatement(new HashMap<>(), captureTableThisRound); + tEnv.executeSql(createTableStatement); + tEnv.executeSql( + "CREATE TABLE sink (" + + " TABLE_NAME STRING," + + " ID BIGINT," + + " COUNTRY STRING," + + " CITY STRING," + + " DETAIL_ADDRESS STRING," + + " primary key (CITY, ID) not enforced" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"); + TableResult tableResult = tEnv.executeSql("insert into sink select * from address"); + JobClient jobClient = tableResult.getJobClient().get(); + + // this round's snapshot data + fetchedDataList.addAll( + Arrays.asList( + format( + "+I[%s, 416874195632735147, China, %s, %s West Town address 1]", + captureTableThisRound, cityName, cityName), + format( + "+I[%s, 416927583791428523, China, %s, %s West Town address 2]", + captureTableThisRound, cityName, cityName), + format( + "+I[%s, 417022095255614379, China, %s, %s West Town address 3]", + captureTableThisRound, cityName, cityName))); + waitForSinkSize("sink", fetchedDataList.size()); + assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + + // step 2: make redo log data for all tables before this round(also includes this + // round), + // test whether only this round table's data is captured. + for (int i = 0; i <= round; i++) { + String tableName = captureAddressTables[i]; + makeRedoLogForAddressTableInRound(tableName, round); + } + // this round's redo log data + fetchedDataList.addAll( + Arrays.asList( + format( + "+U[%s, 416874195632735147, CHINA_%s, %s, %s West Town address 1]", + captureTableThisRound, round, cityName, cityName), + format( + "+I[%s, %d, China, %s, %s West Town address 4]", + captureTableThisRound, + 417022095255614380L + round, + cityName, + cityName))); + + // step 3: assert fetched redo log data in this round + waitForSinkSize("sink", fetchedDataList.size()); + + assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + // step 4: trigger savepoint + finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); + jobClient.cancel().get(); + } + } + + private void testRemoveTablesOneByOne( + int parallelism, + FailoverType failoverType, + FailoverPhase failoverPhase, + String... captureAddressTables) + throws Exception { + + // step 1: create oracle tables with all tables included + initialAddressTables(getJdbcConnection(), captureAddressTables); + + final TemporaryFolder temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + final String savepointDirectory = temporaryFolder.newFolder().toURI().toString(); + + // get all expected data + List fetchedDataList = new ArrayList<>(); + for (String table : captureAddressTables) { + String cityName = table.split("_")[1]; + fetchedDataList.addAll( + Arrays.asList( + format( + "+I[%s, 416874195632735147, China, %s, %s West Town address 1]", + table, cityName, cityName), + format( + "+I[%s, 416927583791428523, China, %s, %s West Town address 2]", + table, cityName, cityName), + format( + "+I[%s, 417022095255614379, China, %s, %s West Town address 3]", + table, cityName, cityName))); + } + + String finishedSavePointPath = null; + // step 2: execute insert and trigger savepoint with all tables added + { + StreamExecutionEnvironment env = + getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String createTableStatement = + getCreateTableStatement(new HashMap<>(), captureAddressTables); + tEnv.executeSql(createTableStatement); + tEnv.executeSql( + "CREATE TABLE sink (" + + " TABLE_NAME STRING," + + " ID BIGINT," + + " COUNTRY STRING," + + " CITY STRING," + + " DETAIL_ADDRESS STRING," + + " primary key (CITY, ID) not enforced" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"); + TableResult tableResult = tEnv.executeSql("insert into sink select * from address"); + JobClient jobClient = tableResult.getJobClient().get(); + + // trigger failover after some snapshot data read finished + if (failoverPhase == FailoverPhase.SNAPSHOT) { + triggerFailover( + failoverType, + jobClient.getJobID(), + miniClusterResource.getMiniCluster(), + () -> sleepMs(100)); + } + waitForSinkSize("sink", fetchedDataList.size()); + assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); + jobClient.cancel().get(); + } + + // test removing table one by one, note that there should be at least one table remaining + for (int round = 0; round < captureAddressTables.length - 1; round++) { + String[] captureTablesThisRound = + Arrays.asList(captureAddressTables) + .subList(round + 1, captureAddressTables.length) + .toArray(new String[0]); + + StreamExecutionEnvironment env = + getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String createTableStatement = + getCreateTableStatement(new HashMap<>(), captureTablesThisRound); + tEnv.executeSql(createTableStatement); + tEnv.executeSql( + "CREATE TABLE sink (" + + " TABLE_NAME STRING," + + " ID BIGINT," + + " COUNTRY STRING," + + " CITY STRING," + + " DETAIL_ADDRESS STRING," + + " primary key (CITY, ID) not enforced" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"); + TableResult tableResult = tEnv.executeSql("insert into sink select * from address"); + JobClient jobClient = tableResult.getJobClient().get(); + + waitForSinkSize("sink", fetchedDataList.size()); + assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + + // step 3: make redo log data for all tables + List expectedRedoLogDataThisRound = new ArrayList<>(); + + for (int i = 0, captureAddressTablesLength = captureAddressTables.length; + i < captureAddressTablesLength; + i++) { + String tableName = captureAddressTables[i]; + makeRedoLogForAddressTableInRound(tableName, round); + if (i <= round) { + continue; + } + String cityName = tableName.split("_")[1]; + + expectedRedoLogDataThisRound.addAll( + Arrays.asList( + format( + "+U[%s, 416874195632735147, CHINA_%s, %s, %s West Town address 1]", + tableName, round, cityName, cityName), + format( + "+I[%s, %d, China, %s, %s West Town address 4]", + tableName, + 417022095255614380L + round, + cityName, + cityName))); + } + + if (failoverPhase == FailoverPhase.REDO_LOG + && TestValuesTableFactory.getRawResults("sink").size() + > fetchedDataList.size()) { + triggerFailover( + failoverType, + jobClient.getJobID(), + miniClusterResource.getMiniCluster(), + () -> sleepMs(100)); + } + + fetchedDataList.addAll(expectedRedoLogDataThisRound); + // step 4: assert fetched redo log data in this round + waitForSinkSize("sink", fetchedDataList.size()); + assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); + + // step 5: trigger savepoint + finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); + jobClient.cancel().get(); + } + } + + private void testNewlyAddedTableOneByOne( + int parallelism, + FailoverType failoverType, + FailoverPhase failoverPhase, + boolean makeRedoLogBeforeCapture, + String... captureAddressTables) + throws Exception { + testNewlyAddedTableOneByOne( + parallelism, + new HashMap<>(), + failoverType, + failoverPhase, + makeRedoLogBeforeCapture, + captureAddressTables); + } + + private void testNewlyAddedTableOneByOne( + int parallelism, + Map sourceOptions, + FailoverType failoverType, + FailoverPhase failoverPhase, + boolean makeRedoLogBeforeCapture, + String... captureAddressTables) + throws Exception { + + // step 1: create oracle tables with initial data + initialAddressTables(getJdbcConnection(), captureAddressTables); + + final TemporaryFolder temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + final String savepointDirectory = temporaryFolder.newFolder().toURI().toString(); + + // test newly added table one by one + String finishedSavePointPath = null; + List fetchedDataList = new ArrayList<>(); + for (int round = 0; round < captureAddressTables.length; round++) { + String[] captureTablesThisRound = + Arrays.asList(captureAddressTables) + .subList(0, round + 1) + .toArray(new String[0]); + String newlyAddedTable = captureAddressTables[round]; + if (makeRedoLogBeforeCapture) { + makeRedoLogBeforeCaptureForAddressTable(newlyAddedTable); + } + StreamExecutionEnvironment env = + getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String createTableStatement = + getCreateTableStatement(sourceOptions, captureTablesThisRound); + tEnv.executeSql(createTableStatement); + tEnv.executeSql( + "CREATE TABLE sink (" + + " TABLE_NAME STRING," + + " ID BIGINT," + + " COUNTRY STRING," + + " CITY STRING," + + " DETAIL_ADDRESS STRING," + + " primary key (CITY, ID) not enforced" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"); + TableResult tableResult = tEnv.executeSql("insert into sink select * from address"); + JobClient jobClient = tableResult.getJobClient().get(); + + // step 2: assert fetched snapshot data in this round + String cityName = newlyAddedTable.split("_")[1]; + List expectedSnapshotDataThisRound = + Arrays.asList( + format( + "+I[%s, 416874195632735147, China, %s, %s West Town address 1]", + newlyAddedTable, cityName, cityName), + format( + "+I[%s, 416927583791428523, China, %s, %s West Town address 2]", + newlyAddedTable, cityName, cityName), + format( + "+I[%s, 417022095255614379, China, %s, %s West Town address 3]", + newlyAddedTable, cityName, cityName)); + if (makeRedoLogBeforeCapture) { + expectedSnapshotDataThisRound = + Arrays.asList( + format( + "+I[%s, 416874195632735147, China, %s, %s West Town address 1]", + newlyAddedTable, cityName, cityName), + format( + "+I[%s, 416927583791428523, China, %s, %s West Town address 2]", + newlyAddedTable, cityName, cityName), + format( + "+I[%s, 417022095255614379, China, %s, %s West Town address 3]", + newlyAddedTable, cityName, cityName), + format( + "+I[%s, 417022095255614381, China, %s, %s West Town address 5]", + newlyAddedTable, cityName, cityName)); + } + + // trigger failover after some snapshot data read finished + if (failoverPhase == FailoverPhase.SNAPSHOT) { + triggerFailover( + failoverType, + jobClient.getJobID(), + miniClusterResource.getMiniCluster(), + () -> sleepMs(100)); + } + fetchedDataList.addAll(expectedSnapshotDataThisRound); + waitForUpsertSinkSize("sink", fetchedDataList.size()); + assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink")); + + // step 3: make some redo log data for this round + makeFirstPartRedoLogForAddressTable(newlyAddedTable); + if (failoverPhase == FailoverPhase.REDO_LOG) { + triggerFailover( + failoverType, + jobClient.getJobID(), + miniClusterResource.getMiniCluster(), + () -> sleepMs(100)); + } + makeSecondPartRedoLogForAddressTable(newlyAddedTable); + + // step 4: assert fetched redo log data in this round + // retract the old data with id 416874195632735147 + fetchedDataList = + fetchedDataList.stream() + .filter( + r -> + !r.contains( + format( + "%s, 416874195632735147", + newlyAddedTable))) + .collect(Collectors.toList()); + List expectedRedoLogUpsertDataThisRound = + Arrays.asList( + // add the new data with id 416874195632735147 + format( + "+I[%s, 416874195632735147, CHINA, %s, %s West Town address 1]", + newlyAddedTable, cityName, cityName), + format( + "+I[%s, 417022095255614380, China, %s, %s West Town address 4]", + newlyAddedTable, cityName, cityName)); + + // step 5: assert fetched redo log data in this round + fetchedDataList.addAll(expectedRedoLogUpsertDataThisRound); + + waitForUpsertSinkSize("sink", fetchedDataList.size()); + // the result size of sink may arrive fetchedDataList.size() with old data, wait one + // checkpoint to wait retract old record and send new record + Thread.sleep(1000); + assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink")); + + // step 6: trigger savepoint + if (round != captureAddressTables.length - 1) { + finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); + } + jobClient.cancel().get(); + } + } + + private void initialAddressTables(Connection connection, String[] addressTables) + throws SQLException { + try { + connection.setAutoCommit(false); + Statement statement = connection.createStatement(); + for (String tableName : addressTables) { + // make initial data for given table + String tableId = ORACLE_SCHEMA + '.' + tableName; + String cityName = tableName.split("_")[1]; + statement.execute( + "CREATE TABLE " + + tableId + + "(" + + " ID NUMBER(19) NOT NULL," + + " COUNTRY VARCHAR(255) NOT NULL," + + " CITY VARCHAR(255) NOT NULL," + + " DETAIL_ADDRESS VARCHAR(1024)," + + " PRIMARY KEY(ID)" + + ")"); + statement.execute( + format( + "INSERT INTO %s " + + "VALUES (416874195632735147, 'China', '%s', '%s West Town address 1')", + tableId, cityName, cityName)); + statement.execute( + format( + "INSERT INTO %s " + + "VALUES (416927583791428523, 'China', '%s', '%s West Town address 2')", + tableId, cityName, cityName)); + statement.execute( + format( + "INSERT INTO %s " + + "VALUES (417022095255614379, 'China', '%s', '%s West Town address 3')", + tableId, cityName, cityName)); + } + connection.commit(); + } finally { + connection.close(); + } + } + + private void makeFirstPartRedoLogForAddressTable(String tableName) throws Exception { + String tableId = ORACLE_SCHEMA + '.' + tableName; + executeSql( + format("UPDATE %s SET COUNTRY = 'CHINA' where ID = 416874195632735147", tableId)); + } + + private void makeSecondPartRedoLogForAddressTable(String tableName) throws Exception { + String tableId = ORACLE_SCHEMA + '.' + tableName; + String cityName = tableName.split("_")[1]; + executeSql( + format( + "INSERT INTO %s VALUES(417022095255614380, 'China','%s','%s West Town address 4')", + tableId, cityName, cityName)); + } + + private void makeRedoLogBeforeCaptureForAddressTable(String tableName) throws Exception { + String tableId = ORACLE_SCHEMA + '.' + tableName; + String cityName = tableName.split("_")[1]; + executeSql( + format( + "INSERT INTO %s VALUES(417022095255614381, 'China','%s','%s West Town address 5')", + tableId, cityName, cityName)); + } + + private void makeRedoLogForAddressTableInRound(String tableName, int round) throws Exception { + String tableId = ORACLE_SCHEMA + '.' + tableName; + String cityName = tableName.split("_")[1]; + executeSql( + format( + "UPDATE %s SET COUNTRY = 'CHINA_%s' where id = 416874195632735147", + tableId, round)); + executeSql( + format( + "INSERT INTO %s VALUES(%d, 'China','%s','%s West Town address 4')", + tableId, 417022095255614380L + round, cityName, cityName)); + } + + private void sleepMs(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException ignored) { + } + } + + private String triggerSavepointWithRetry(JobClient jobClient, String savepointDirectory) + throws ExecutionException, InterruptedException { + int retryTimes = 0; + // retry 600 times, it takes 100 milliseconds per time, at most retry 1 minute + while (retryTimes < 600) { + try { + return jobClient.triggerSavepoint(savepointDirectory).get(); + } catch (Exception e) { + Optional exception = + ExceptionUtils.findThrowable(e, CheckpointException.class); + if (exception.isPresent() + && exception.get().getMessage().contains("Checkpoint triggering task")) { + Thread.sleep(100); + retryTimes++; + } else { + throw e; + } + } + } + return null; + } + + private StreamExecutionEnvironment getStreamExecutionEnvironmentFromSavePoint( + String finishedSavePointPath, int parallelism) throws Exception { + Configuration configuration = new Configuration(); + if (finishedSavePointPath != null) { + configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath); + } + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); + env.setParallelism(parallelism); + env.enableCheckpointing(200L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L)); + return env; + } + + private String getCreateTableStatement( + Map otherOptions, String... captureTableNames) { + return String.format( + "CREATE TABLE address (" + + " table_name STRING METADATA VIRTUAL," + + " ID BIGINT NOT NULL," + + " COUNTRY STRING," + + " CITY STRING," + + " DETAIL_ADDRESS STRING," + + " primary key (CITY, ID) not enforced" + + ") WITH (" + + " 'connector' = 'oracle-cdc'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'," + + " 'debezium.log.mining.strategy' = 'online_catalog'," + + " 'debezium.database.history.store.only.captured.tables.ddl' = 'true'," + + " 'scan.incremental.snapshot.chunk.size' = '2'," + + " 'scan.newly-added-table.enabled' = 'true'," + + " 'chunk-meta.group.size' = '2'" + + " %s" + + ")", + ORACLE_CONTAINER.getHost(), + ORACLE_CONTAINER.getOraclePort(), + ORACLE_CONTAINER.getUsername(), + ORACLE_CONTAINER.getPassword(), + ORACLE_DATABASE, + ORACLE_SCHEMA, + getTableNameRegex(captureTableNames), + otherOptions.isEmpty() + ? "" + : "," + + otherOptions.entrySet().stream() + .map( + e -> + String.format( + "'%s'='%s'", + e.getKey(), e.getValue())) + .collect(Collectors.joining(","))); + } + + private void executeSql(String sql) throws Exception { + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute(sql); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java index 3c4a715906..ee5a1c947d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java @@ -25,9 +25,9 @@ import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook; import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks; import org.apache.flink.cdc.connectors.oracle.source.utils.OracleConnectionUtils; +import org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverPhase; +import org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverType; import org.apache.flink.cdc.connectors.oracle.testutils.TestTable; -import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; -import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -58,6 +58,7 @@ import java.util.stream.Collectors; import static java.lang.String.format; +import static org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.triggerFailover; import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.STRING; import static org.apache.flink.table.catalog.Column.physical; @@ -707,54 +708,4 @@ private void executeSql(String sql) throws Exception { statement.execute(sql); } } - - // ------------------------------------------------------------------------ - // test utilities - // ------------------------------------------------------------------------ - - /** The type of failover. */ - private enum FailoverType { - TM, - JM, - NONE - } - - /** The phase of failover. */ - private enum FailoverPhase { - SNAPSHOT, - REDO_LOG, - NEVER - } - - private static void triggerFailover( - FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) - throws Exception { - switch (type) { - case TM: - restartTaskManager(miniCluster, afterFailAction); - break; - case JM: - triggerJobManagerFailover(jobId, miniCluster, afterFailAction); - break; - case NONE: - break; - default: - throw new IllegalStateException("Unexpected value: " + type); - } - } - - private static void triggerJobManagerFailover( - JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception { - final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get(); - haLeadershipControl.revokeJobMasterLeadership(jobId).get(); - afterFailAction.run(); - haLeadershipControl.grantJobMasterLeadership(jobId).get(); - } - - private static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction) - throws Exception { - miniCluster.terminateTaskManager(0).get(); - afterFailAction.run(); - miniCluster.startTaskManager(); - } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java index 8fb1157f4f..7f8b83b49d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java @@ -174,7 +174,7 @@ public static void createAndInitialize(String sqlFile) throws Exception { } // ------------------ utils ----------------------- - private static List listTables(Connection connection) { + protected static List listTables(Connection connection) { Set tableIdSet = new HashSet<>(); String queryTablesSql = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java index 584c8c16b9..4fe043a5e9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java @@ -118,7 +118,8 @@ public void testRequiredProperties() { .defaultValue(), null, JdbcSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(), - JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + JdbcSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -154,7 +155,8 @@ public void testCommonProperties() { .defaultValue(), null, SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(), - SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -164,6 +166,7 @@ public void testOptionalProperties() { options.put("port", "1521"); options.put("hostname", MY_LOCALHOST); options.put("debezium.snapshot.mode", "initial"); + options.put(SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); DynamicTableSource actualSource = createTableSource(options); Properties dbzProperties = new Properties(); @@ -194,7 +197,8 @@ public void testOptionalProperties() { .defaultValue(), null, SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(), - SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + true); assertEquals(expectedSource, actualSource); } @@ -220,6 +224,7 @@ public void testScanIncrementalProperties() { options.put(SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.key(), String.valueOf(fetchSize)); options.put(SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.key(), "true"); options.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.key(), "true"); + options.put(SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); options.put( JdbcSourceOptions.CONNECT_TIMEOUT.key(), @@ -260,6 +265,7 @@ public void testScanIncrementalProperties() { distributionFactorLower, null, true, + true, true); assertEquals(expectedSource, actualSource); } @@ -297,7 +303,8 @@ public void testStartupFromInitial() { .defaultValue(), null, SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(), - SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -334,7 +341,8 @@ public void testStartupFromLatestOffset() { .defaultValue(), null, SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(), - SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -375,7 +383,8 @@ public void testMetadataColumns() { .defaultValue(), null, SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(), - SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "table_name", "schema_name"); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java new file mode 100644 index 0000000000..2dd6e3a042 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.testutils; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; + +import org.apache.commons.lang3.StringUtils; + +import static java.lang.String.format; +import static org.apache.flink.util.Preconditions.checkState; + +/** Oracle test utilities. */ +public class OracleTestUtils { + + /** The type of failover. */ + public enum FailoverType { + TM, + JM, + NONE + } + + /** The phase of failover. */ + public enum FailoverPhase { + SNAPSHOT, + REDO_LOG, + NEVER + } + + public static void triggerFailover( + FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) + throws Exception { + switch (type) { + case TM: + restartTaskManager(miniCluster, afterFailAction); + break; + case JM: + triggerJobManagerFailover(jobId, miniCluster, afterFailAction); + break; + case NONE: + break; + default: + throw new IllegalStateException("Unexpected value: " + type); + } + } + + public static void triggerJobManagerFailover( + JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception { + final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get(); + haLeadershipControl.revokeJobMasterLeadership(jobId).get(); + afterFailAction.run(); + haLeadershipControl.grantJobMasterLeadership(jobId).get(); + } + + public static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction) + throws Exception { + miniCluster.terminateTaskManager(0).get(); + afterFailAction.run(); + miniCluster.startTaskManager(); + } + + public static void waitForSinkSize(String sinkName, int expectedSize) + throws InterruptedException { + while (sinkSize(sinkName) < expectedSize) { + Thread.sleep(100); + } + } + + public static int sinkSize(String sinkName) { + synchronized (TestValuesTableFactory.class) { + try { + return TestValuesTableFactory.getRawResults(sinkName).size(); + } catch (IllegalArgumentException e) { + // job is not started yet + return 0; + } + } + } + + public static void waitForUpsertSinkSize(String sinkName, int expectedSize) + throws InterruptedException { + while (upsertSinkSize(sinkName) < expectedSize) { + Thread.sleep(100); + } + } + + public static int upsertSinkSize(String sinkName) { + synchronized (TestValuesTableFactory.class) { + try { + return TestValuesTableFactory.getResults(sinkName).size(); + } catch (IllegalArgumentException e) { + // job is not started yet + return 0; + } + } + } + + public static String getTableNameRegex(String[] captureCustomerTables) { + checkState(captureCustomerTables.length > 0); + if (captureCustomerTables.length == 1) { + return captureCustomerTables[0]; + } else { + // pattern that matches multiple tables + return format("(%s)", StringUtils.join(captureCustomerTables, "|")); + } + } +}