From 5054b2f4cbf6c5a238a8ee72a1e1e36bcbb3a838 Mon Sep 17 00:00:00 2001 From: Gaurav Miglani Date: Mon, 6 May 2024 22:47:26 +0530 Subject: [PATCH] DBZ-7845 [CHORE] pr review changes --- .../io/debezium/connector/jdbc/Buffer.java | 14 +++++++++++ .../connector/jdbc/JdbcChangeEventSink.java | 25 ++++++++----------- .../jdbc/JdbcSinkConnectorConfig.java | 4 +-- .../jdbc/dialect/postgres/EnumType.java | 2 +- 4 files changed, 28 insertions(+), 17 deletions(-) diff --git a/src/main/java/io/debezium/connector/jdbc/Buffer.java b/src/main/java/io/debezium/connector/jdbc/Buffer.java index 99d5db95..94a8a074 100644 --- a/src/main/java/io/debezium/connector/jdbc/Buffer.java +++ b/src/main/java/io/debezium/connector/jdbc/Buffer.java @@ -14,9 +14,23 @@ */ public interface Buffer { + /** + * to add a {@link SinkRecordDescriptor} to the internal buffer and + * call the {@link Buffer#flush()} when buffer size >= {@link JdbcSinkConnectorConfig#getBatchSize()} + * @param recordDescriptor the Sink record descriptor + * @return the buffer records + */ List add(SinkRecordDescriptor recordDescriptor); + /** + * to clear and flush the internal buffer + * @return {@link SinkRecordDescriptor} the flushed buffer records. + */ List flush(); + /** + * to check whether buffer is empty or not. + * @return true if empty else false. + */ boolean isEmpty(); } diff --git a/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java b/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java index 397f0e5f..3caface5 100644 --- a/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java +++ b/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java @@ -127,13 +127,7 @@ public void execute(Collection records) { flushBuffer(tableId, updateBufferByTable.get(tableId).flush()); } - Buffer tableIdBuffer; - if (config.isUseReductionBuffer()) { - tableIdBuffer = deleteBufferByTable.computeIfAbsent(tableId, k -> new ReducedRecordBuffer(config)); - } - else { - tableIdBuffer = deleteBufferByTable.computeIfAbsent(tableId, k -> new RecordBuffer(config)); - } + Buffer tableIdBuffer = resolveBuffer(deleteBufferByTable, tableId); List toFlush = tableIdBuffer.add(sinkRecordDescriptor); @@ -151,13 +145,7 @@ public void execute(Collection records) { Stopwatch updateBufferStopwatch = Stopwatch.reusable(); updateBufferStopwatch.start(); - Buffer tableIdBuffer; - if (config.isUseReductionBuffer()) { - tableIdBuffer = updateBufferByTable.computeIfAbsent(tableId, k -> new ReducedRecordBuffer(config)); - } - else { - tableIdBuffer = updateBufferByTable.computeIfAbsent(tableId, k -> new RecordBuffer(config)); - } + Buffer tableIdBuffer = resolveBuffer(deleteBufferByTable, tableId); List toFlush = tableIdBuffer.add(sinkRecordDescriptor); updateBufferStopwatch.stop(); @@ -187,6 +175,15 @@ private static boolean isSchemaChange(SinkRecord record) { && record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE); } + private Buffer resolveBuffer(Map bufferMap, TableId tableId) { + if (config.isUseReductionBuffer()) { + return bufferMap.computeIfAbsent(tableId, k -> new ReducedRecordBuffer(config)); + } + else { + return bufferMap.computeIfAbsent(tableId, k -> new RecordBuffer(config)); + } + } + private SinkRecordDescriptor buildRecordSinkDescriptor(SinkRecord record) { SinkRecordDescriptor sinkRecordDescriptor; diff --git a/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java b/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java index 3219b2b9..afb9eb17 100644 --- a/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java +++ b/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java @@ -310,13 +310,13 @@ public class JdbcSinkConnectorConfig { + "should be excluded from change events. The field names must be delimited by the format : "); public static final Field USE_REDUCTION_BUFFER_FIELD = Field.create(USE_REDUCTION_BUFFER) - .withDisplayName("Controls whether to use reduction buffer by the connector to reduce the SQL load when duplicates are found") + .withDisplayName("Specifies whether to use the reduction buffer.") .withType(Type.BOOLEAN) .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 2)) .withWidth(ConfigDef.Width.SHORT) .withImportance(ConfigDef.Importance.MEDIUM) .withDefault(false) - .withDescription("Whether to use reduced buffer or not."); + .withDescription("A reduction buffer consolidates the execution of SQL statements by primary key to reduce the SQL load on the target database. When set to false (the default), each incoming event is applied as a logical SQL change. When set to true, incoming events that refer to the same row will be reduced to a single logical change based on the most recent row state."); protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor() .connector( diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/EnumType.java b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/EnumType.java index 52f22494..48b0fb54 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/EnumType.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/EnumType.java @@ -33,7 +33,7 @@ public String[] getRegistrationKeys() { @Override public String getTypeName(DatabaseDialect dialect, Schema schema, boolean key) { - LOGGER.trace("Cannot create enum types automatically, please create the table by hand. Using STRING fallback."); + LOGGER.warn("Cannot create enum types automatically, please create the table by hand. Using STRING fallback."); return ConnectStringType.INSTANCE.getTypeName(dialect, schema, key); }