Skip to content

Commit

Permalink
EDIT: get keyStruct from BEFORE or AFTER always in upserBinlog
Browse files Browse the repository at this point in the history
  • Loading branch information
Seung-Min Lee committed Jul 4, 2024
1 parent 44db3a7 commit 45106d4
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -302,15 +301,10 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
}

if (!reachBinlogStart) {
if (record.key() != null) {
snapshotRecords.put(
(Struct) record.key(), Collections.singletonList(record));
} else {
List<SourceRecord> records =
snapshotRecords.computeIfAbsent(
(Struct) record.value(), key -> new LinkedList<>());
records.add(record);
}
List<SourceRecord> records =
snapshotRecords.computeIfAbsent(
(Struct) record.value(), key -> new LinkedList<>());
records.add(record);
} else {
RecordUtils.upsertBinlog(
snapshotRecords,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -118,10 +117,7 @@ public static void upsertBinlog(
upsertBinlog(
snapshotRecords,
binlogRecord,
hasPrimaryKey
? keyStruct
: createReadOpValue(
binlogRecord, Envelope.FieldName.AFTER),
createReadOpValue(binlogRecord, Envelope.FieldName.AFTER),
false);
break;
case UPDATE:
Expand All @@ -144,20 +140,13 @@ public static void upsertBinlog(
}
// If the chunk key changed, we still send here
// This will cause the at-least-once semantics
upsertBinlog(
snapshotRecords,
binlogRecord,
hasPrimaryKey ? keyStruct : structFromAfter,
false);
upsertBinlog(snapshotRecords, binlogRecord, structFromAfter, false);
break;
case DELETE:
upsertBinlog(
snapshotRecords,
binlogRecord,
hasPrimaryKey
? keyStruct
: createReadOpValue(
binlogRecord, Envelope.FieldName.BEFORE),
createReadOpValue(binlogRecord, Envelope.FieldName.BEFORE),
true);
break;
case READ:
Expand All @@ -176,14 +165,11 @@ private static void upsertBinlog(
SourceRecord binlogRecord,
Struct keyStruct,
boolean isDelete) {
boolean hasPrimaryKey = binlogRecord.key() != null;
List<SourceRecord> records = snapshotRecords.get(keyStruct);
if (isDelete) {
if (records == null || records.isEmpty()) {
LOG.error(
"Deleting a record which is not in its split for tables without primary keys. This may happen when the chunk key column is updated in another snapshot split.");
} else if (hasPrimaryKey) {
snapshotRecords.remove(keyStruct);
} else {
snapshotRecords.get(keyStruct).remove(0);
}
Expand All @@ -198,15 +184,11 @@ private static void upsertBinlog(
binlogRecord.key(),
binlogRecord.valueSchema(),
createReadOpValue(binlogRecord, Envelope.FieldName.AFTER));
if (hasPrimaryKey) {
snapshotRecords.put(keyStruct, Collections.singletonList(record));
} else {
if (records == null) {
snapshotRecords.put(keyStruct, new LinkedList<>());
records = snapshotRecords.get(keyStruct);
}
records.add(record);
if (records == null) {
snapshotRecords.put(keyStruct, new LinkedList<>());
records = snapshotRecords.get(keyStruct);
}
records.add(record);
}
}

Expand Down

0 comments on commit 45106d4

Please sign in to comment.