Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys #3448

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ Incremental snapshot reading is a new mechanism to read snapshot of a table. Com
If you would like the source run in parallel, each parallel reader should have an unique server id, so the 'server-id' must be a range like '5400-6400',
and the range must be larger than the parallelism.

During the incremental snapshot reading, the MySQL CDC Source firstly splits snapshot chunks (splits) by primary key of table,
During the incremental snapshot reading, the MySQL CDC Source firstly splits snapshot chunks (splits) by user specified chunk key of table,
and then MySQL CDC Source assigns the chunks to multiple readers to read the data of snapshot chunk.

#### Controlling Parallelism
Expand Down Expand Up @@ -573,7 +573,7 @@ The CDC job may restart fails in this case. So the heartbeat event will help upd

When the MySQL CDC source is started, it reads snapshot of table parallelly and then reads binlog of table with single parallelism.

In snapshot phase, the snapshot is cut into multiple snapshot chunks according to primary key of table and the size of table rows.
In snapshot phase, the snapshot is cut into multiple snapshot chunks according to chunk key of table and the size of table rows.
Snapshot chunks is assigned to multiple snapshot readers. Each snapshot reader reads its received chunks with [chunk reading algorithm](#snapshot-chunk-reading) and send the read data to downstream.
The source manages the process status (finished or not) of chunks, thus the source of snapshot phase can support checkpoint in chunk level.
If a failure happens, the source can be restored and continue to read chunks from last finished chunks.
Expand All @@ -589,7 +589,9 @@ Flink performs checkpoints for the source periodically, in case of failover, the

When performing incremental snapshot reading, MySQL CDC source need a criterion which used to split the table.
MySQL CDC Source use a splitting column to split the table to multiple splits (chunks). By default, MySQL CDC source will identify the primary key column of the table and use the first column in primary key as the splitting column.
If there is no primary key in the table, incremental snapshot reading will fail and you can disable `scan.incremental.snapshot.enabled` to fallback to old snapshot reading mechanism.
If there is no primary key in the table, user must specify `scan.incremental.snapshot.chunk.key-column`,
otherwise incremental snapshot reading will fail and you can disable `scan.incremental.snapshot.enabled` to fallback to old snapshot reading mechanism.
Please note that using a column not in primary key as a chunk key can result in slower table query performance.

For numeric and auto incremental splitting column, MySQL CDC Source efficiently splits chunks by fixed step length.
For example, if you had a table with a primary key column of `id` which is auto-incremental BIGINT type, the minimum value was `0` and maximum value was `100`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,5 @@ public class JdbcSourceOptions extends SourceOptions {
.noDefaultValue()
.withDescription(
"The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table."
+ "By default, the chunk key is the first column of the primary key and the chunk key is the RowId in oracle."
+ "This column must be a column of the primary key.");
+ "By default, the chunk key is the first column of the primary key and the chunk key is the RowId in oracle.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,7 @@ public class MySqlSourceOptions {
.noDefaultValue()
.withDescription(
"The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table."
+ "By default, the chunk key is the first column of the primary key."
+ "This column must be a column of the primary key.");
+ "By default, the chunk key is the first column of the primary key.");

@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ public static RowType getChunkKeyColumnType(Column chunkKeyColumn) {

/**
* Get the chunk key column. This column could be set by `chunkKeyColumn`. If the table doesn't
* have primary keys, `chunkKeyColumn` must be set. If the table has primary keys,
* `chunkKeyColumn` must be a column of them or else null. When the parameter `chunkKeyColumn`
* is not set and the table has primary keys, return the first column of primary keys.
* have primary keys, `chunkKeyColumn` must be set. When the parameter `chunkKeyColumn` is not
* set and the table has primary keys, return the first column of primary keys.
*/
public static Column getChunkKeyColumn(Table table, Map<ObjectPath, String> chunkKeyColumns) {
List<Column> primaryKeys = table.primaryKeyColumns();
Expand All @@ -68,7 +67,8 @@ public static Column getChunkKeyColumn(Table table, Map<ObjectPath, String> chun
throw new ValidationException(
"'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.");
}
List<Column> searchColumns = primaryKeys.isEmpty() ? table.columns() : primaryKeys;

List<Column> searchColumns = table.columns();
if (chunkKeyColumn != null) {
Optional<Column> targetColumn =
searchColumns.stream()
Expand All @@ -79,9 +79,8 @@ public static Column getChunkKeyColumn(Table table, Map<ObjectPath, String> chun
}
throw new ValidationException(
String.format(
"Chunk key column '%s' doesn't exist in the %s [%s] of the table %s.",
"Chunk key column '%s' doesn't exist in the columns [%s] of the table %s.",
chunkKeyColumn,
primaryKeys.isEmpty() ? "user specified columns" : "primary keys",
searchColumns.stream()
.map(Column::name)
.collect(Collectors.joining(",")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,7 @@ public static Object[] rowToArray(ResultSet rs, int size) throws SQLException {
}

public static Struct getStructContainsChunkKey(SourceRecord record) {
// If the table has primary keys, chunk key is in the record key struct
if (record.key() != null) {
return (Struct) record.key();
}

// If the table doesn't have primary keys, chunk key is in the after struct for insert or
// Use chunk key in the after struct for insert or
// the before struct for delete/update
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Expand All @@ -109,9 +104,9 @@ public static void upsertBinlog(
if (isDataChangeRecord(binlogRecord)) {
Struct value = (Struct) binlogRecord.value();
if (value != null) {
Struct keyStruct = getStructContainsChunkKey(binlogRecord);
Struct chunkKeyStruct = getStructContainsChunkKey(binlogRecord);
if (splitKeyRangeContains(
getSplitKey(splitBoundaryType, nameAdjuster, keyStruct),
getSplitKey(splitBoundaryType, nameAdjuster, chunkKeyStruct),
splitStart,
splitEnd)) {
boolean hasPrimaryKey = binlogRecord.key() != null;
Expand All @@ -124,7 +119,7 @@ public static void upsertBinlog(
snapshotRecords,
binlogRecord,
hasPrimaryKey
? keyStruct
? (Struct) binlogRecord.key()
: createReadOpValue(
binlogRecord, Envelope.FieldName.AFTER),
false);
Expand Down Expand Up @@ -152,15 +147,15 @@ public static void upsertBinlog(
upsertBinlog(
snapshotRecords,
binlogRecord,
hasPrimaryKey ? keyStruct : structFromAfter,
hasPrimaryKey ? (Struct) binlogRecord.key() : structFromAfter,
false);
break;
case DELETE:
upsertBinlog(
snapshotRecords,
binlogRecord,
hasPrimaryKey
? keyStruct
? (Struct) binlogRecord.key()
: createReadOpValue(
binlogRecord, Envelope.FieldName.BEFORE),
true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void testAssignCompositePkTableWithWrongChunkKeyColumn() {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
"Chunk key column 'errorCol' doesn't exist in the primary keys [card_no,level] of the table")
"Chunk key column 'errorCol' doesn't exist in the columns [card_no,level,name,note] of the table")
.isPresent());
}
}
Expand Down Expand Up @@ -416,6 +416,28 @@ public void testTableWithoutPrimaryKey() {
}
}

@Test
public void testAssignTableWithoutPrimaryKeyWithChunkKeyColumn() {
String tableWithoutPrimaryKey = "customers_no_pk";
List<String> expected =
Arrays.asList(
"customers_no_pk null [462]",
"customers_no_pk [462] [823]",
"customers_no_pk [823] [1184]",
"customers_no_pk [1184] [1545]",
"customers_no_pk [1545] [1906]",
"customers_no_pk [1906] null");
List<String> splits =
getTestAssignSnapshotSplits(
customerDatabase,
4,
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {tableWithoutPrimaryKey},
"id");
assertEquals(expected, splits);
}
Comment on lines +419 to +439
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we can also test using non-primary key columns as chunk keys for table with primary keys.


@Test
public void testEnumerateTablesLazily() {
final MySqlSourceConfig configuration =
Expand Down
Loading