Skip to content

Commit

Permalink
[FLINK-35740][cdc-connector][mysql] Allow column as chunk key even it…
Browse files Browse the repository at this point in the history
…'s not primary key

This closes apache#3448.
  • Loading branch information
SML0127 authored and qiaozongmi committed Sep 23, 2024
1 parent c722c1a commit 4496d31
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 28 deletions.
8 changes: 5 additions & 3 deletions docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ CREATE TABLE products (
* (3)在快照读取之前,Source 不需要数据库锁权限。

如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此`server id`的范围必须类似于 `5400-6400`
且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk)
且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 源首先会根据您指定的表块键将表分块(chunk)
然后 MySQL CDC Source 将多个块分配给多个 reader 以并行读取表的数据。

#### 并发读取
Expand Down Expand Up @@ -550,7 +550,7 @@ MySQL 集群中你监控的服务器出现故障后, 你只需将受监视的服

当 MySQL CDC Source 启动时,它并行读取表的快照,然后以单并行度的方式读取表的 binlog。

在快照阶段,根据表的主键和表行的大小将快照切割成多个快照块
在快照阶段,快照会根据表的分块键和表行的大小切割成多个快照块
快照块被分配给多个快照读取器。每个快照读取器使用 [区块读取算法](#snapshot-chunk-reading) 并将读取的数据发送到下游。
Source 会管理块的进程状态(完成或未完成),因此快照阶段的 Source 可以支持块级别的 checkpoint。
如果发生故障,可以恢复 Source 并继续从最后完成的块中读取块。
Expand All @@ -565,7 +565,9 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业

在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。
MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 会识别表的主键列,并使用主键中的第一列作为用作分片列。
如果表中没有主键, 增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 来回退到旧的快照读取机制。
如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`
否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。
请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。

对于数值和自动增量拆分列,MySQL CDC Source 按固定步长高效地拆分块。
例如,如果你有一个主键列为`id`的表,它是自动增量 BIGINT 类型,最小值为`0`,最大值为`100`
Expand Down
8 changes: 5 additions & 3 deletions docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ Incremental snapshot reading is a new mechanism to read snapshot of a table. Com
If you would like the source run in parallel, each parallel reader should have an unique server id, so the 'server-id' must be a range like '5400-6400',
and the range must be larger than the parallelism.

During the incremental snapshot reading, the MySQL CDC Source firstly splits snapshot chunks (splits) by primary key of table,
During the incremental snapshot reading, the MySQL CDC Source firstly splits snapshot chunks (splits) by user specified chunk key of table,
and then MySQL CDC Source assigns the chunks to multiple readers to read the data of snapshot chunk.

#### Controlling Parallelism
Expand Down Expand Up @@ -580,7 +580,7 @@ The CDC job may restart fails in this case. So the heartbeat event will help upd

When the MySQL CDC source is started, it reads snapshot of table parallelly and then reads binlog of table with single parallelism.

In snapshot phase, the snapshot is cut into multiple snapshot chunks according to primary key of table and the size of table rows.
In snapshot phase, the snapshot is cut into multiple snapshot chunks according to chunk key of table and the size of table rows.
Snapshot chunks is assigned to multiple snapshot readers. Each snapshot reader reads its received chunks with [chunk reading algorithm](#snapshot-chunk-reading) and send the read data to downstream.
The source manages the process status (finished or not) of chunks, thus the source of snapshot phase can support checkpoint in chunk level.
If a failure happens, the source can be restored and continue to read chunks from last finished chunks.
Expand All @@ -596,7 +596,9 @@ Flink performs checkpoints for the source periodically, in case of failover, the

When performing incremental snapshot reading, MySQL CDC source need a criterion which used to split the table.
MySQL CDC Source use a splitting column to split the table to multiple splits (chunks). By default, MySQL CDC source will identify the primary key column of the table and use the first column in primary key as the splitting column.
If there is no primary key in the table, incremental snapshot reading will fail and you can disable `scan.incremental.snapshot.enabled` to fallback to old snapshot reading mechanism.
If there is no primary key in the table, user must specify `scan.incremental.snapshot.chunk.key-column`,
otherwise incremental snapshot reading will fail and you can disable `scan.incremental.snapshot.enabled` to fallback to old snapshot reading mechanism.
Please note that using a column not in primary key as a chunk key can result in slower table query performance.

For numeric and auto incremental splitting column, MySQL CDC Source efficiently splits chunks by fixed step length.
For example, if you had a table with a primary key column of `id` which is auto-incremental BIGINT type, the minimum value was `0` and maximum value was `100`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,5 @@ public class JdbcSourceOptions extends SourceOptions {
.noDefaultValue()
.withDescription(
"The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table."
+ "By default, the chunk key is the first column of the primary key and the chunk key is the RowId in oracle."
+ "This column must be a column of the primary key.");
+ "By default, the chunk key is the first column of the primary key and the chunk key is the RowId in oracle.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,7 @@ public class MySqlSourceOptions {
.noDefaultValue()
.withDescription(
"The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table."
+ "By default, the chunk key is the first column of the primary key."
+ "This column must be a column of the primary key.");
+ "By default, the chunk key is the first column of the primary key.");

@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ public static RowType getChunkKeyColumnType(Column chunkKeyColumn) {

/**
* Get the chunk key column. This column could be set by `chunkKeyColumn`. If the table doesn't
* have primary keys, `chunkKeyColumn` must be set. If the table has primary keys,
* `chunkKeyColumn` must be a column of them or else null. When the parameter `chunkKeyColumn`
* is not set and the table has primary keys, return the first column of primary keys.
* have primary keys, `chunkKeyColumn` must be set. When the parameter `chunkKeyColumn` is not
* set and the table has primary keys, return the first column of primary keys.
*/
public static Column getChunkKeyColumn(Table table, Map<ObjectPath, String> chunkKeyColumns) {
List<Column> primaryKeys = table.primaryKeyColumns();
Expand All @@ -68,7 +67,8 @@ public static Column getChunkKeyColumn(Table table, Map<ObjectPath, String> chun
throw new ValidationException(
"'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.");
}
List<Column> searchColumns = primaryKeys.isEmpty() ? table.columns() : primaryKeys;

List<Column> searchColumns = table.columns();
if (chunkKeyColumn != null) {
Optional<Column> targetColumn =
searchColumns.stream()
Expand All @@ -79,9 +79,8 @@ public static Column getChunkKeyColumn(Table table, Map<ObjectPath, String> chun
}
throw new ValidationException(
String.format(
"Chunk key column '%s' doesn't exist in the %s [%s] of the table %s.",
"Chunk key column '%s' doesn't exist in the columns [%s] of the table %s.",
chunkKeyColumn,
primaryKeys.isEmpty() ? "user specified columns" : "primary keys",
searchColumns.stream()
.map(Column::name)
.collect(Collectors.joining(",")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,7 @@ public static Object[] rowToArray(ResultSet rs, int size) throws SQLException {
}

public static Struct getStructContainsChunkKey(SourceRecord record) {
// If the table has primary keys, chunk key is in the record key struct
if (record.key() != null) {
return (Struct) record.key();
}

// If the table doesn't have primary keys, chunk key is in the after struct for insert or
// Use chunk key in the after struct for insert or
// the before struct for delete/update
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Expand All @@ -109,9 +104,9 @@ public static void upsertBinlog(
if (isDataChangeRecord(binlogRecord)) {
Struct value = (Struct) binlogRecord.value();
if (value != null) {
Struct keyStruct = getStructContainsChunkKey(binlogRecord);
Struct chunkKeyStruct = getStructContainsChunkKey(binlogRecord);
if (splitKeyRangeContains(
getSplitKey(splitBoundaryType, nameAdjuster, keyStruct),
getSplitKey(splitBoundaryType, nameAdjuster, chunkKeyStruct),
splitStart,
splitEnd)) {
boolean hasPrimaryKey = binlogRecord.key() != null;
Expand All @@ -124,7 +119,7 @@ public static void upsertBinlog(
snapshotRecords,
binlogRecord,
hasPrimaryKey
? keyStruct
? (Struct) binlogRecord.key()
: createReadOpValue(
binlogRecord, Envelope.FieldName.AFTER),
false);
Expand Down Expand Up @@ -152,15 +147,15 @@ public static void upsertBinlog(
upsertBinlog(
snapshotRecords,
binlogRecord,
hasPrimaryKey ? keyStruct : structFromAfter,
hasPrimaryKey ? (Struct) binlogRecord.key() : structFromAfter,
false);
break;
case DELETE:
upsertBinlog(
snapshotRecords,
binlogRecord,
hasPrimaryKey
? keyStruct
? (Struct) binlogRecord.key()
: createReadOpValue(
binlogRecord, Envelope.FieldName.BEFORE),
true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void testAssignCompositePkTableWithWrongChunkKeyColumn() {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
"Chunk key column 'errorCol' doesn't exist in the primary keys [card_no,level] of the table")
"Chunk key column 'errorCol' doesn't exist in the columns [card_no,level,name,note] of the table")
.isPresent());
}
}
Expand Down Expand Up @@ -416,6 +416,51 @@ public void testTableWithoutPrimaryKey() {
}
}

@Test
public void testAssignTableWithoutPrimaryKeyWithChunkKeyColumn() {
String tableWithoutPrimaryKey = "customers_no_pk";
List<String> expected =
Arrays.asList(
"customers_no_pk null [462]",
"customers_no_pk [462] [823]",
"customers_no_pk [823] [1184]",
"customers_no_pk [1184] [1545]",
"customers_no_pk [1545] [1906]",
"customers_no_pk [1906] null");
List<String> splits =
getTestAssignSnapshotSplits(
customerDatabase,
4,
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {tableWithoutPrimaryKey},
"id");
assertEquals(expected, splits);
}

@Test
public void testAssignTableWithPrimaryKeyWithChunkKeyColumnNotInPrimaryKey() {
String tableWithoutPrimaryKey = "customers";
List<String> expected =
Arrays.asList(
"customers null [user_12]",
"customers [user_12] [user_15]",
"customers [user_15] [user_18]",
"customers [user_18] [user_20]",
"customers [user_20] [user_4]",
"customers [user_4] [user_7]",
"customers [user_7] null");
List<String> splits =
getTestAssignSnapshotSplits(
customerDatabase,
4,
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {tableWithoutPrimaryKey},
"name");
assertEquals(expected, splits);
}

@Test
public void testEnumerateTablesLazily() {
final MySqlSourceConfig configuration =
Expand Down

0 comments on commit 4496d31

Please sign in to comment.