Skip to content

Commit

Permalink
[FLINK-35143][pipeline-connector][mysql] Expose newly added tables ca…
Browse files Browse the repository at this point in the history
…pture in mysql pipeline connector. (apache#3411)


Co-authored-by: Muhammet Orazov <[email protected]>
Co-authored-by: north.lin <[email protected]>
  • Loading branch information
3 people authored and qiaozongmi committed Sep 23, 2024
1 parent 096be8f commit ac6d553
Show file tree
Hide file tree
Showing 6 changed files with 937 additions and 2 deletions.
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,13 @@ pipeline:
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。<br>
若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。</td>
</tr>
<tr>
<td>scan.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。</td>
</tr>
</tbody>
</table>
</div>
Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ pipeline:
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
<tr>
<td>scan.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable scan the newly added tables feature or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint.</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
Expand Down Expand Up @@ -123,6 +124,7 @@ public DataSource createDataSource(Context context) {
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -158,7 +160,8 @@ public DataSource createDataSource(Context context) {
.closeIdleReaders(closeIdleReaders)
.includeSchemaChanges(includeSchemaChanges)
.debeziumProperties(getDebeziumProperties(configMap))
.jdbcProperties(getJdbcProperties(configMap));
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);

Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
List<String> capturedTables = getTableList(configFactory.createConfig(0), selectors);
Expand Down Expand Up @@ -216,7 +219,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(CONNECTION_POOL_SIZE);
options.add(HEARTBEAT_INTERVAL);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);

options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(CHUNK_META_GROUP_SIZE);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,14 @@ public class MySqlDataSourceOptions {
+ "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be "
+ "greater than or equal to 1.14 when enabling this feature.");

@Experimental
public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
ConfigOptions.key("scan.newly-added-table.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to scan the newly added tables or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint.");

@Experimental
public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
ConfigOptions.key("schema-change.enabled")
Expand Down
Loading

0 comments on commit ac6d553

Please sign in to comment.