diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md index 1d81555de0..3a2f2a6a66 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md @@ -500,7 +500,7 @@ CREATE TABLE products ( * (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此`server id`的范围必须类似于 `5400-6400`, -且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk), +且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 源首先会根据您指定的表块键将表分块(chunk) 然后 MySQL CDC Source 将多个块分配给多个 reader 以并行读取表的数据。 #### 并发读取 @@ -550,7 +550,7 @@ MySQL 集群中你监控的服务器出现故障后, 你只需将受监视的服 当 MySQL CDC Source 启动时,它并行读取表的快照,然后以单并行度的方式读取表的 binlog。 -在快照阶段,根据表的主键和表行的大小将快照切割成多个快照块。 +在快照阶段,快照会根据表的分块键和表行的大小切割成多个快照块。 快照块被分配给多个快照读取器。每个快照读取器使用 [区块读取算法](#snapshot-chunk-reading) 并将读取的数据发送到下游。 Source 会管理块的进程状态(完成或未完成),因此快照阶段的 Source 可以支持块级别的 checkpoint。 如果发生故障,可以恢复 Source 并继续从最后完成的块中读取块。 @@ -565,7 +565,9 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业 在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。 MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 会识别表的主键列,并使用主键中的第一列作为用作分片列。 -如果表中没有主键, 增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 来回退到旧的快照读取机制。 +如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`、 +否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。 +请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。 对于数值和自动增量拆分列,MySQL CDC Source 按固定步长高效地拆分块。 例如,如果你有一个主键列为`id`的表,它是自动增量 BIGINT 类型,最小值为`0`,最大值为`100`, diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index a677e440f9..8bee137efd 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -530,7 +530,7 @@ Incremental snapshot reading is a new mechanism to read snapshot of a table. Com If you would like the source run in parallel, each parallel reader should have an unique server id, so the 'server-id' must be a range like '5400-6400', and the range must be larger than the parallelism. -During the incremental snapshot reading, the MySQL CDC Source firstly splits snapshot chunks (splits) by primary key of table, +During the incremental snapshot reading, the MySQL CDC Source firstly splits snapshot chunks (splits) by user specified chunk key of table, and then MySQL CDC Source assigns the chunks to multiple readers to read the data of snapshot chunk. #### Controlling Parallelism @@ -580,7 +580,7 @@ The CDC job may restart fails in this case. So the heartbeat event will help upd When the MySQL CDC source is started, it reads snapshot of table parallelly and then reads binlog of table with single parallelism. -In snapshot phase, the snapshot is cut into multiple snapshot chunks according to primary key of table and the size of table rows. +In snapshot phase, the snapshot is cut into multiple snapshot chunks according to chunk key of table and the size of table rows. Snapshot chunks is assigned to multiple snapshot readers. Each snapshot reader reads its received chunks with [chunk reading algorithm](#snapshot-chunk-reading) and send the read data to downstream. The source manages the process status (finished or not) of chunks, thus the source of snapshot phase can support checkpoint in chunk level. If a failure happens, the source can be restored and continue to read chunks from last finished chunks. @@ -596,7 +596,9 @@ Flink performs checkpoints for the source periodically, in case of failover, the When performing incremental snapshot reading, MySQL CDC source need a criterion which used to split the table. MySQL CDC Source use a splitting column to split the table to multiple splits (chunks). By default, MySQL CDC source will identify the primary key column of the table and use the first column in primary key as the splitting column. -If there is no primary key in the table, incremental snapshot reading will fail and you can disable `scan.incremental.snapshot.enabled` to fallback to old snapshot reading mechanism. +If there is no primary key in the table, user must specify `scan.incremental.snapshot.chunk.key-column`, +otherwise incremental snapshot reading will fail and you can disable `scan.incremental.snapshot.enabled` to fallback to old snapshot reading mechanism. +Please note that using a column not in primary key as a chunk key can result in slower table query performance. For numeric and auto incremental splitting column, MySQL CDC Source efficiently splits chunks by fixed step length. For example, if you had a table with a primary key column of `id` which is auto-incremental BIGINT type, the minimum value was `0` and maximum value was `100`, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/JdbcSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/JdbcSourceOptions.java index fedd6664e9..4224763646 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/JdbcSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/JdbcSourceOptions.java @@ -109,6 +109,5 @@ public class JdbcSourceOptions extends SourceOptions { .noDefaultValue() .withDescription( "The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table." - + "By default, the chunk key is the first column of the primary key and the chunk key is the RowId in oracle." - + "This column must be a column of the primary key."); + + "By default, the chunk key is the first column of the primary key and the chunk key is the RowId in oracle."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index 1a1c3f254e..f3424c8dfb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -243,8 +243,7 @@ public class MySqlSourceOptions { .noDefaultValue() .withDescription( "The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table." - + "By default, the chunk key is the first column of the primary key." - + "This column must be a column of the primary key."); + + "By default, the chunk key is the first column of the primary key."); @Experimental public static final ConfigOption SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java index 2b40533bb5..4932560998 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java @@ -57,9 +57,8 @@ public static RowType getChunkKeyColumnType(Column chunkKeyColumn) { /** * Get the chunk key column. This column could be set by `chunkKeyColumn`. If the table doesn't - * have primary keys, `chunkKeyColumn` must be set. If the table has primary keys, - * `chunkKeyColumn` must be a column of them or else null. When the parameter `chunkKeyColumn` - * is not set and the table has primary keys, return the first column of primary keys. + * have primary keys, `chunkKeyColumn` must be set. When the parameter `chunkKeyColumn` is not + * set and the table has primary keys, return the first column of primary keys. */ public static Column getChunkKeyColumn(Table table, Map chunkKeyColumns) { List primaryKeys = table.primaryKeyColumns(); @@ -68,7 +67,8 @@ public static Column getChunkKeyColumn(Table table, Map chun throw new ValidationException( "'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys."); } - List searchColumns = primaryKeys.isEmpty() ? table.columns() : primaryKeys; + + List searchColumns = table.columns(); if (chunkKeyColumn != null) { Optional targetColumn = searchColumns.stream() @@ -79,9 +79,8 @@ public static Column getChunkKeyColumn(Table table, Map chun } throw new ValidationException( String.format( - "Chunk key column '%s' doesn't exist in the %s [%s] of the table %s.", + "Chunk key column '%s' doesn't exist in the columns [%s] of the table %s.", chunkKeyColumn, - primaryKeys.isEmpty() ? "user specified columns" : "primary keys", searchColumns.stream() .map(Column::name) .collect(Collectors.joining(",")), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index 8a3a904f9e..d85944adbe 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -82,12 +82,7 @@ public static Object[] rowToArray(ResultSet rs, int size) throws SQLException { } public static Struct getStructContainsChunkKey(SourceRecord record) { - // If the table has primary keys, chunk key is in the record key struct - if (record.key() != null) { - return (Struct) record.key(); - } - - // If the table doesn't have primary keys, chunk key is in the after struct for insert or + // Use chunk key in the after struct for insert or // the before struct for delete/update Envelope.Operation op = Envelope.operationFor(record); Struct value = (Struct) record.value(); @@ -109,9 +104,9 @@ public static void upsertBinlog( if (isDataChangeRecord(binlogRecord)) { Struct value = (Struct) binlogRecord.value(); if (value != null) { - Struct keyStruct = getStructContainsChunkKey(binlogRecord); + Struct chunkKeyStruct = getStructContainsChunkKey(binlogRecord); if (splitKeyRangeContains( - getSplitKey(splitBoundaryType, nameAdjuster, keyStruct), + getSplitKey(splitBoundaryType, nameAdjuster, chunkKeyStruct), splitStart, splitEnd)) { boolean hasPrimaryKey = binlogRecord.key() != null; @@ -124,7 +119,7 @@ public static void upsertBinlog( snapshotRecords, binlogRecord, hasPrimaryKey - ? keyStruct + ? (Struct) binlogRecord.key() : createReadOpValue( binlogRecord, Envelope.FieldName.AFTER), false); @@ -152,7 +147,7 @@ public static void upsertBinlog( upsertBinlog( snapshotRecords, binlogRecord, - hasPrimaryKey ? keyStruct : structFromAfter, + hasPrimaryKey ? (Struct) binlogRecord.key() : structFromAfter, false); break; case DELETE: @@ -160,7 +155,7 @@ public static void upsertBinlog( snapshotRecords, binlogRecord, hasPrimaryKey - ? keyStruct + ? (Struct) binlogRecord.key() : createReadOpValue( binlogRecord, Envelope.FieldName.BEFORE), true); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index fe4ccd1a27..759827d9df 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -166,7 +166,7 @@ public void testAssignCompositePkTableWithWrongChunkKeyColumn() { assertTrue( ExceptionUtils.findThrowableWithMessage( t, - "Chunk key column 'errorCol' doesn't exist in the primary keys [card_no,level] of the table") + "Chunk key column 'errorCol' doesn't exist in the columns [card_no,level,name,note] of the table") .isPresent()); } } @@ -416,6 +416,51 @@ public void testTableWithoutPrimaryKey() { } } + @Test + public void testAssignTableWithoutPrimaryKeyWithChunkKeyColumn() { + String tableWithoutPrimaryKey = "customers_no_pk"; + List expected = + Arrays.asList( + "customers_no_pk null [462]", + "customers_no_pk [462] [823]", + "customers_no_pk [823] [1184]", + "customers_no_pk [1184] [1545]", + "customers_no_pk [1545] [1906]", + "customers_no_pk [1906] null"); + List splits = + getTestAssignSnapshotSplits( + customerDatabase, + 4, + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + new String[] {tableWithoutPrimaryKey}, + "id"); + assertEquals(expected, splits); + } + + @Test + public void testAssignTableWithPrimaryKeyWithChunkKeyColumnNotInPrimaryKey() { + String tableWithoutPrimaryKey = "customers"; + List expected = + Arrays.asList( + "customers null [user_12]", + "customers [user_12] [user_15]", + "customers [user_15] [user_18]", + "customers [user_18] [user_20]", + "customers [user_20] [user_4]", + "customers [user_4] [user_7]", + "customers [user_7] null"); + List splits = + getTestAssignSnapshotSplits( + customerDatabase, + 4, + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + new String[] {tableWithoutPrimaryKey}, + "name"); + assertEquals(expected, splits); + } + @Test public void testEnumerateTablesLazily() { final MySqlSourceConfig configuration =