Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-7830 add test, optimize flush when not enable delete
Browse files Browse the repository at this point in the history
  • Loading branch information
DucLT committed May 2, 2024
1 parent 042dbfe commit 0504e49
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,16 @@ public void execute(Collection<SinkRecord> records) {

if (sinkRecordDescriptor.isDelete()) {

if (!config.isDeleteEnabled()) {
LOGGER.debug("Deletes are not enabled, skipping delete for topic '{}'", sinkRecordDescriptor.getTopicName());
continue;
}

if (updateBufferByTable.get(tableId) != null && !updateBufferByTable.get(tableId).isEmpty()) {
// When an delete arrives, update buffer must be flushed to avoid losing an
// delete for the same record after its update.

flushBuffer(tableId, updateBufferByTable.get(tableId).flush());

}

if (!config.isDeleteEnabled()) {
LOGGER.debug("Deletes are not enabled, skipping delete for topic '{}'", sinkRecordDescriptor.getTopicName());
continue;
}

RecordBuffer tableIdBuffer = deleteBufferByTable.computeIfAbsent(tableId, k -> new RecordBuffer(config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package io.debezium.connector.jdbc.integration;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.kafka.connect.sink.SinkRecord;
Expand Down Expand Up @@ -242,4 +244,33 @@ public void testShouldHandleTruncateRecord(SinkRecordFactory factory) {
tableAssert.exists().hasNumberOfRows(0).hasNumberOfColumns(3);
}
}

@ParameterizedTest
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
@FixFor("DBZ-7830")
public void testShouldFlushUpdateBufferWhenDelete(SinkRecordFactory factory) {
final Map<String, String> properties = getDefaultSinkConfig();
properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, SchemaEvolutionMode.BASIC.getValue());
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, PrimaryKeyMode.RECORD_KEY.getValue());
properties.put(JdbcSinkConnectorConfig.DELETE_ENABLED, "true");
properties.put(JdbcSinkConnectorConfig.BATCH_SIZE, "500");
startSinkConnector(properties);
assertSinkConnectorIsRunning();

final String tableName = randomTableName();
final String topicName = topicName("server1", "schema", tableName);

final SinkRecord deleteRecord = factory.deleteRecord(topicName);
List<SinkRecord> records = new ArrayList<SinkRecord>();

records.add(factory.createRecord(topicName, (byte) 2));
records.add(factory.createRecord(topicName, (byte) 1));
records.add(deleteRecord);
// should insert success (not violate primary key constraint)
records.add(factory.createRecord(topicName, (byte) 1));
consume(records);

final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName(deleteRecord));
tableAssert.exists().hasNumberOfRows(2).hasNumberOfColumns(3);
}
}

0 comments on commit 0504e49

Please sign in to comment.