Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-7845 [CHORE] pr review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
gaurav7261 committed May 7, 2024
1 parent 73aebe1 commit 5054b2f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 17 deletions.
14 changes: 14 additions & 0 deletions src/main/java/io/debezium/connector/jdbc/Buffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,23 @@
*/
public interface Buffer {

/**
* to add a {@link SinkRecordDescriptor} to the internal buffer and
* call the {@link Buffer#flush()} when buffer size >= {@link JdbcSinkConnectorConfig#getBatchSize()}
* @param recordDescriptor the Sink record descriptor
* @return the buffer records
*/
List<SinkRecordDescriptor> add(SinkRecordDescriptor recordDescriptor);

/**
* to clear and flush the internal buffer
* @return {@link SinkRecordDescriptor} the flushed buffer records.
*/
List<SinkRecordDescriptor> flush();

/**
* to check whether buffer is empty or not.
* @return true if empty else false.
*/
boolean isEmpty();
}
25 changes: 11 additions & 14 deletions src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,7 @@ public void execute(Collection<SinkRecord> records) {
flushBuffer(tableId, updateBufferByTable.get(tableId).flush());
}

Buffer tableIdBuffer;
if (config.isUseReductionBuffer()) {
tableIdBuffer = deleteBufferByTable.computeIfAbsent(tableId, k -> new ReducedRecordBuffer(config));
}
else {
tableIdBuffer = deleteBufferByTable.computeIfAbsent(tableId, k -> new RecordBuffer(config));
}
Buffer tableIdBuffer = resolveBuffer(deleteBufferByTable, tableId);

List<SinkRecordDescriptor> toFlush = tableIdBuffer.add(sinkRecordDescriptor);

Expand All @@ -151,13 +145,7 @@ public void execute(Collection<SinkRecord> records) {
Stopwatch updateBufferStopwatch = Stopwatch.reusable();
updateBufferStopwatch.start();

Buffer tableIdBuffer;
if (config.isUseReductionBuffer()) {
tableIdBuffer = updateBufferByTable.computeIfAbsent(tableId, k -> new ReducedRecordBuffer(config));
}
else {
tableIdBuffer = updateBufferByTable.computeIfAbsent(tableId, k -> new RecordBuffer(config));
}
Buffer tableIdBuffer = resolveBuffer(deleteBufferByTable, tableId);

List<SinkRecordDescriptor> toFlush = tableIdBuffer.add(sinkRecordDescriptor);
updateBufferStopwatch.stop();
Expand Down Expand Up @@ -187,6 +175,15 @@ private static boolean isSchemaChange(SinkRecord record) {
&& record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE);
}

private Buffer resolveBuffer(Map<TableId, Buffer> bufferMap, TableId tableId) {
if (config.isUseReductionBuffer()) {
return bufferMap.computeIfAbsent(tableId, k -> new ReducedRecordBuffer(config));
}
else {
return bufferMap.computeIfAbsent(tableId, k -> new RecordBuffer(config));
}
}

private SinkRecordDescriptor buildRecordSinkDescriptor(SinkRecord record) {

SinkRecordDescriptor sinkRecordDescriptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,13 @@ public class JdbcSinkConnectorConfig {
+ "should be excluded from change events. The field names must be delimited by the format <topic>:<field> ");

public static final Field USE_REDUCTION_BUFFER_FIELD = Field.create(USE_REDUCTION_BUFFER)
.withDisplayName("Controls whether to use reduction buffer by the connector to reduce the SQL load when duplicates are found")
.withDisplayName("Specifies whether to use the reduction buffer.")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 2))
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDefault(false)
.withDescription("Whether to use reduced buffer or not.");
.withDescription("A reduction buffer consolidates the execution of SQL statements by primary key to reduce the SQL load on the target database. When set to false (the default), each incoming event is applied as a logical SQL change. When set to true, incoming events that refer to the same row will be reduced to a single logical change based on the most recent row state.");

protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor()
.connector(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public String[] getRegistrationKeys() {

@Override
public String getTypeName(DatabaseDialect dialect, Schema schema, boolean key) {
LOGGER.trace("Cannot create enum types automatically, please create the table by hand. Using STRING fallback.");
LOGGER.warn("Cannot create enum types automatically, please create the table by hand. Using STRING fallback.");
return ConnectStringType.INSTANCE.getTypeName(dialect, schema, key);
}

Expand Down

0 comments on commit 5054b2f

Please sign in to comment.