Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-7845 [FEAT] optimize buffer by removing copying
Browse files Browse the repository at this point in the history
  • Loading branch information
gaurav7261 authored and jpechane committed May 14, 2024
1 parent e34ad96 commit 8267118
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
17 changes: 13 additions & 4 deletions src/main/java/io/debezium/connector/jdbc/RecordBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public RecordBuffer(JdbcSinkConnectorConfig connectorConfig) {

public List<SinkRecordDescriptor> add(SinkRecordDescriptor recordDescriptor) {

ArrayList<SinkRecordDescriptor> flushed = new ArrayList<>();
List<SinkRecordDescriptor> flushed = new ArrayList<>();
boolean isSchemaChanged = false;

if (records.isEmpty()) {
keySchema = recordDescriptor.getKeySchema();
Expand All @@ -40,21 +41,29 @@ public List<SinkRecordDescriptor> add(SinkRecordDescriptor recordDescriptor) {
if (!Objects.equals(keySchema, recordDescriptor.getKeySchema()) || !Objects.equals(valueSchema, recordDescriptor.getValueSchema())) {
keySchema = recordDescriptor.getKeySchema();
valueSchema = recordDescriptor.getValueSchema();
flushed.addAll(flush());
flushed = flush();
isSchemaChanged = true;
}

records.add(recordDescriptor);

if (isSchemaChanged) {
// current record is already added in internal buffer after flush
// just return the flushed buffer ignoring buffer size check
return flushed;
}


if (records.size() >= connectorConfig.getBatchSize()) {
flushed.addAll(flush());
flushed = flush();
}

return flushed;
}

public List<SinkRecordDescriptor> flush() {

ArrayList<SinkRecordDescriptor> flushed = new ArrayList<>(records);
List<SinkRecordDescriptor> flushed = new ArrayList<>(records);
records.clear();

return flushed;
Expand Down
16 changes: 12 additions & 4 deletions src/main/java/io/debezium/connector/jdbc/ReducedRecordBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public ReducedRecordBuffer(JdbcSinkConnectorConfig connectorConfig) {

@Override
public List<SinkRecordDescriptor> add(SinkRecordDescriptor recordDescriptor) {
ArrayList<SinkRecordDescriptor> flushed = new ArrayList<>();
List<SinkRecordDescriptor> flushed = new ArrayList<>();
boolean isSchemaChanged = false;

if (records.isEmpty()) {
keySchema = recordDescriptor.getKeySchema();
Expand All @@ -45,7 +46,8 @@ public List<SinkRecordDescriptor> add(SinkRecordDescriptor recordDescriptor) {
if (!Objects.equals(keySchema, recordDescriptor.getKeySchema()) || !Objects.equals(valueSchema, recordDescriptor.getValueSchema())) {
keySchema = recordDescriptor.getKeySchema();
valueSchema = recordDescriptor.getValueSchema();
flushed.addAll(flush());
flushed = flush();
isSchemaChanged = true;
}

Struct keyStruct = recordDescriptor.getKeyStruct(connectorConfig.getPrimaryKeyMode());
Expand All @@ -56,16 +58,22 @@ public List<SinkRecordDescriptor> add(SinkRecordDescriptor recordDescriptor) {
throw new ConnectException("No struct-based primary key defined for record key/value, reduction buffer require struct based primary key");
}

if (isSchemaChanged) {
// current record is already added in internal buffer after flush,
// just return the flushed buffer ignoring buffer size check
return flushed;
}

if (records.size() >= connectorConfig.getBatchSize()) {
flushed.addAll(flush());
flushed = flush();
}

return flushed;
}

@Override
public List<SinkRecordDescriptor> flush() {
ArrayList<SinkRecordDescriptor> flushed = new ArrayList<>(records.values());
List<SinkRecordDescriptor> flushed = new ArrayList<>(records.values());
records.clear();
return flushed;
}
Expand Down

0 comments on commit 8267118

Please sign in to comment.