From 45106d4e9801ba9570af066b0d7c29bf8f5955a6 Mon Sep 17 00:00:00 2001 From: Seung-Min Lee Date: Fri, 5 Jul 2024 01:50:30 +0900 Subject: [PATCH] EDIT: get keyStruct from BEFORE or AFTER always in upserBinlog --- .../debezium/reader/SnapshotSplitReader.java | 14 +++----- .../mysql/source/utils/RecordUtils.java | 32 ++++--------------- 2 files changed, 11 insertions(+), 35 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index f37428f8ed4..8a23591619c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -53,7 +53,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -302,15 +301,10 @@ public Iterator pollSplitRecords() throws InterruptedException { } if (!reachBinlogStart) { - if (record.key() != null) { - snapshotRecords.put( - (Struct) record.key(), Collections.singletonList(record)); - } else { - List records = - snapshotRecords.computeIfAbsent( - (Struct) record.value(), key -> new LinkedList<>()); - records.add(record); - } + List records = + snapshotRecords.computeIfAbsent( + (Struct) record.value(), key -> new LinkedList<>()); + records.add(record); } else { RecordUtils.upsertBinlog( snapshotRecords, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index 5d993dea465..3f96ed1a41f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -44,7 +44,6 @@ import java.time.Instant; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -118,10 +117,7 @@ public static void upsertBinlog( upsertBinlog( snapshotRecords, binlogRecord, - hasPrimaryKey - ? keyStruct - : createReadOpValue( - binlogRecord, Envelope.FieldName.AFTER), + createReadOpValue(binlogRecord, Envelope.FieldName.AFTER), false); break; case UPDATE: @@ -144,20 +140,13 @@ public static void upsertBinlog( } // If the chunk key changed, we still send here // This will cause the at-least-once semantics - upsertBinlog( - snapshotRecords, - binlogRecord, - hasPrimaryKey ? keyStruct : structFromAfter, - false); + upsertBinlog(snapshotRecords, binlogRecord, structFromAfter, false); break; case DELETE: upsertBinlog( snapshotRecords, binlogRecord, - hasPrimaryKey - ? keyStruct - : createReadOpValue( - binlogRecord, Envelope.FieldName.BEFORE), + createReadOpValue(binlogRecord, Envelope.FieldName.BEFORE), true); break; case READ: @@ -176,14 +165,11 @@ private static void upsertBinlog( SourceRecord binlogRecord, Struct keyStruct, boolean isDelete) { - boolean hasPrimaryKey = binlogRecord.key() != null; List records = snapshotRecords.get(keyStruct); if (isDelete) { if (records == null || records.isEmpty()) { LOG.error( "Deleting a record which is not in its split for tables without primary keys. This may happen when the chunk key column is updated in another snapshot split."); - } else if (hasPrimaryKey) { - snapshotRecords.remove(keyStruct); } else { snapshotRecords.get(keyStruct).remove(0); } @@ -198,15 +184,11 @@ private static void upsertBinlog( binlogRecord.key(), binlogRecord.valueSchema(), createReadOpValue(binlogRecord, Envelope.FieldName.AFTER)); - if (hasPrimaryKey) { - snapshotRecords.put(keyStruct, Collections.singletonList(record)); - } else { - if (records == null) { - snapshotRecords.put(keyStruct, new LinkedList<>()); - records = snapshotRecords.get(keyStruct); - } - records.add(record); + if (records == null) { + snapshotRecords.put(keyStruct, new LinkedList<>()); + records = snapshotRecords.get(keyStruct); } + records.add(record); } }