Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35143][pipeline-connector][mysql] Expose newly added tables capture in mysql pipeline connector. #3411

Merged
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,13 @@ pipeline:
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。<br>
若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。</td>
</tr>
<tr>
<td>scan.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。</td>
</tr>
</tbody>
</table>
</div>
Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ pipeline:
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
<tr>
<td>scan.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable scan the newly added tables feature or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint.</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
Expand Down Expand Up @@ -123,6 +124,7 @@ public DataSource createDataSource(Context context) {
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -158,7 +160,8 @@ public DataSource createDataSource(Context context) {
.closeIdleReaders(closeIdleReaders)
.includeSchemaChanges(includeSchemaChanges)
.debeziumProperties(getDebeziumProperties(configMap))
.jdbcProperties(getJdbcProperties(configMap));
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);

Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
List<String> capturedTables = getTableList(configFactory.createConfig(0), selectors);
Expand Down Expand Up @@ -216,7 +219,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(CONNECTION_POOL_SIZE);
options.add(HEARTBEAT_INTERVAL);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);

options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(CHUNK_META_GROUP_SIZE);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,14 @@ public class MySqlDataSourceOptions {
+ "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be "
+ "greater than or equal to 1.14 when enabling this feature.");

@Experimental
public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
ConfigOptions.key("scan.newly-added-table.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to scan the newly added tables or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint.");

@Experimental
public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
ConfigOptions.key("schema-change.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public void testPrefixRequireOption() {
.isEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() + ".products"));
}

class MockContext implements Factory.Context {
static class MockContext implements Factory.Context {

Configuration factoryConfiguration;

Expand Down
Loading
Loading