diff --git a/src/main/java/io/debezium/connector/jdbc/Buffer.java b/src/main/java/io/debezium/connector/jdbc/Buffer.java index 4f2fdffb..99d5db95 100644 --- a/src/main/java/io/debezium/connector/jdbc/Buffer.java +++ b/src/main/java/io/debezium/connector/jdbc/Buffer.java @@ -1,3 +1,8 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ package io.debezium.connector.jdbc; import java.util.List; diff --git a/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java b/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java index e488fa9d..397f0e5f 100644 --- a/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java +++ b/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java @@ -130,7 +130,8 @@ public void execute(Collection records) { Buffer tableIdBuffer; if (config.isUseReductionBuffer()) { tableIdBuffer = deleteBufferByTable.computeIfAbsent(tableId, k -> new ReducedRecordBuffer(config)); - } else { + } + else { tableIdBuffer = deleteBufferByTable.computeIfAbsent(tableId, k -> new RecordBuffer(config)); } @@ -153,7 +154,8 @@ public void execute(Collection records) { Buffer tableIdBuffer; if (config.isUseReductionBuffer()) { tableIdBuffer = updateBufferByTable.computeIfAbsent(tableId, k -> new ReducedRecordBuffer(config)); - } else { + } + else { tableIdBuffer = updateBufferByTable.computeIfAbsent(tableId, k -> new RecordBuffer(config)); } diff --git a/src/main/java/io/debezium/connector/jdbc/ReducedRecordBuffer.java b/src/main/java/io/debezium/connector/jdbc/ReducedRecordBuffer.java index d072cce5..d6ef5c1a 100644 --- a/src/main/java/io/debezium/connector/jdbc/ReducedRecordBuffer.java +++ b/src/main/java/io/debezium/connector/jdbc/ReducedRecordBuffer.java @@ -1,3 +1,8 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ package io.debezium.connector.jdbc; import org.apache.kafka.connect.data.Schema; @@ -46,7 +51,8 @@ public List add(SinkRecordDescriptor recordDescriptor) { Struct keyStruct = recordDescriptor.getKeyStruct(connectorConfig.getPrimaryKeyMode()); if (keyStruct != null) { records.put(keyStruct, recordDescriptor); - } else { + } + else { throw new ConnectException("No struct-based primary key defined for record key/value, reduction buffer require struct based primary key"); } diff --git a/src/test/java/io/debezium/connector/jdbc/ReducedRecordBufferTest.java b/src/test/java/io/debezium/connector/jdbc/ReducedRecordBufferTest.java index 3ab701b2..ae542095 100644 --- a/src/test/java/io/debezium/connector/jdbc/ReducedRecordBufferTest.java +++ b/src/test/java/io/debezium/connector/jdbc/ReducedRecordBufferTest.java @@ -64,7 +64,7 @@ void setUp() { @DisplayName("When 10 sink records arrives and buffer size is 5 then the buffer will be flushed 2 times") void correctlyBuffer(SinkRecordFactory factory) { - JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(Map.of("batch.size", "5", "primary.key.mode", "record_key","primary.key.fields", "id")); + JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(Map.of("batch.size", "5", "primary.key.mode", "record_key", "primary.key.fields", "id")); ReducedRecordBuffer reducedRecordBuffer = new ReducedRecordBuffer(config); @@ -90,7 +90,7 @@ void correctlyBuffer(SinkRecordFactory factory) { @DisplayName("When key schema changes then the buffer will be flushed") void keySchemaChange(SinkRecordFactory factory) { - JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(Map.of("batch.size", "5", "primary.key.mode", "record_key","primary.key.fields", "id")); + JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(Map.of("batch.size", "5", "primary.key.mode", "record_key", "primary.key.fields", "id")); ReducedRecordBuffer reducedRecordBuffer = new ReducedRecordBuffer(config); @@ -135,7 +135,7 @@ void keySchemaChange(SinkRecordFactory factory) { @DisplayName("When value schema changes then the buffer will be flushed") void valueSchemaChange(SinkRecordFactory factory) { - JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(Map.of("batch.size", "5", "primary.key.mode", "record_key","primary.key.fields", "id")); + JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(Map.of("batch.size", "5", "primary.key.mode", "record_key", "primary.key.fields", "id")); ReducedRecordBuffer reducedRecordBuffer = new ReducedRecordBuffer(config); @@ -180,7 +180,7 @@ void valueSchemaChange(SinkRecordFactory factory) { @DisplayName("When 10 sink records arrives and buffer size is 5 with every alternate duplicate sink record then the buffer will be flushed 1 time") void correctlyBufferWithDuplicate(SinkRecordFactory factory) { - JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(Map.of("batch.size", "5", "primary.key.mode", "record_key","primary.key.fields", "id")); + JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(Map.of("batch.size", "5", "primary.key.mode", "record_key", "primary.key.fields", "id")); ReducedRecordBuffer reducedRecordBuffer = new ReducedRecordBuffer(config);