diff --git a/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java b/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java index cc5b8916..7f3be288 100644 --- a/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java +++ b/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java @@ -127,7 +127,7 @@ public void execute(Collection records) { flushBuffer(tableId, updateBufferByTable.get(tableId).flush()); } - Buffer tableIdBuffer = resolveBuffer(deleteBufferByTable, tableId); + Buffer tableIdBuffer = resolveBuffer(deleteBufferByTable, tableId, sinkRecordDescriptor); List toFlush = tableIdBuffer.add(sinkRecordDescriptor); @@ -145,7 +145,7 @@ public void execute(Collection records) { Stopwatch updateBufferStopwatch = Stopwatch.reusable(); updateBufferStopwatch.start(); - Buffer tableIdBuffer = resolveBuffer(updateBufferByTable, tableId); + Buffer tableIdBuffer = resolveBuffer(updateBufferByTable, tableId, sinkRecordDescriptor); List toFlush = tableIdBuffer.add(sinkRecordDescriptor); updateBufferStopwatch.stop(); @@ -175,8 +175,8 @@ private static boolean isSchemaChange(SinkRecord record) { && record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE); } - private Buffer resolveBuffer(Map bufferMap, TableId tableId) { - if (config.isUseReductionBuffer()) { + private Buffer resolveBuffer(Map bufferMap, TableId tableId, SinkRecordDescriptor sinkRecordDescriptor) { + if (config.isUseReductionBuffer() && !sinkRecordDescriptor.getKeyFieldNames().isEmpty()) { return bufferMap.computeIfAbsent(tableId, k -> new ReducedRecordBuffer(config)); } else {