diff --git a/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java b/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java index 21b42c37..45ca744f 100644 --- a/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java +++ b/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java @@ -115,16 +115,16 @@ public void execute(Collection records) { if (sinkRecordDescriptor.isDelete()) { + if (!config.isDeleteEnabled()) { + LOGGER.debug("Deletes are not enabled, skipping delete for topic '{}'", sinkRecordDescriptor.getTopicName()); + continue; + } + if (updateBufferByTable.get(tableId) != null && !updateBufferByTable.get(tableId).isEmpty()) { // When an delete arrives, update buffer must be flushed to avoid losing an // delete for the same record after its update. + flushBuffer(tableId, updateBufferByTable.get(tableId).flush()); - - } - - if (!config.isDeleteEnabled()) { - LOGGER.debug("Deletes are not enabled, skipping delete for topic '{}'", sinkRecordDescriptor.getTopicName()); - continue; } RecordBuffer tableIdBuffer = deleteBufferByTable.computeIfAbsent(tableId, k -> new RecordBuffer(config)); diff --git a/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkDeleteEnabledTest.java b/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkDeleteEnabledTest.java index 723e5a6b..636a3f97 100644 --- a/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkDeleteEnabledTest.java +++ b/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkDeleteEnabledTest.java @@ -5,6 +5,8 @@ */ package io.debezium.connector.jdbc.integration; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.kafka.connect.sink.SinkRecord; @@ -242,4 +244,33 @@ public void testShouldHandleTruncateRecord(SinkRecordFactory factory) { tableAssert.exists().hasNumberOfRows(0).hasNumberOfColumns(3); } } + + @ParameterizedTest + @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class) + @FixFor("DBZ-7830") + public void testShouldFlushUpdateBufferWhenDelete(SinkRecordFactory factory) { + final Map properties = getDefaultSinkConfig(); + properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, SchemaEvolutionMode.BASIC.getValue()); + properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, PrimaryKeyMode.RECORD_KEY.getValue()); + properties.put(JdbcSinkConnectorConfig.DELETE_ENABLED, "true"); + properties.put(JdbcSinkConnectorConfig.BATCH_SIZE, "500"); + startSinkConnector(properties); + assertSinkConnectorIsRunning(); + + final String tableName = randomTableName(); + final String topicName = topicName("server1", "schema", tableName); + + final SinkRecord deleteRecord = factory.deleteRecord(topicName); + List records = new ArrayList(); + + records.add(factory.createRecord(topicName, (byte) 2)); + records.add(factory.createRecord(topicName, (byte) 1)); + records.add(deleteRecord); + // should insert success (not violate primary key constraint) + records.add(factory.createRecord(topicName, (byte) 1)); + consume(records); + + final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName(deleteRecord)); + tableAssert.exists().hasNumberOfRows(2).hasNumberOfColumns(3); + } }