Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-7830 Flush buffer when a delete arrives
Browse files Browse the repository at this point in the history
  • Loading branch information
DLT1412 authored and Naros committed May 2, 2024
1 parent 6bde1c7 commit ba1ab3c
Showing 1 changed file with 7 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ public void execute(Collection<SinkRecord> records) {

if (sinkRecordDescriptor.isDelete()) {

if (updateBufferByTable.get(tableId) != null && !updateBufferByTable.get(tableId).isEmpty()) {
// When an delete arrives, update buffer must be flushed to avoid losing an
// delete for the same record after its update.
flushBuffer(tableId, updateBufferByTable.get(tableId).flush());

}

if (!config.isDeleteEnabled()) {
LOGGER.debug("Deletes are not enabled, skipping delete for topic '{}'", sinkRecordDescriptor.getTopicName());
continue;
Expand Down

0 comments on commit ba1ab3c

Please sign in to comment.