From f47abb16955ec503552360488b59f3d14b48522e Mon Sep 17 00:00:00 2001 From: Gaurav Miglani Date: Thu, 9 May 2024 13:21:21 +0530 Subject: [PATCH] DBZ-7845 [FEAT] optimize buffer by removing copying --- .../debezium/connector/jdbc/RecordBuffer.java | 17 +++++++++++++---- .../connector/jdbc/ReducedRecordBuffer.java | 16 ++++++++++++---- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/debezium/connector/jdbc/RecordBuffer.java b/src/main/java/io/debezium/connector/jdbc/RecordBuffer.java index 10080c89..a1de3dc1 100644 --- a/src/main/java/io/debezium/connector/jdbc/RecordBuffer.java +++ b/src/main/java/io/debezium/connector/jdbc/RecordBuffer.java @@ -30,7 +30,8 @@ public RecordBuffer(JdbcSinkConnectorConfig connectorConfig) { public List add(SinkRecordDescriptor recordDescriptor) { - ArrayList flushed = new ArrayList<>(); + List flushed = new ArrayList<>(); + boolean isSchemaChanged = false; if (records.isEmpty()) { keySchema = recordDescriptor.getKeySchema(); @@ -40,13 +41,21 @@ public List add(SinkRecordDescriptor recordDescriptor) { if (!Objects.equals(keySchema, recordDescriptor.getKeySchema()) || !Objects.equals(valueSchema, recordDescriptor.getValueSchema())) { keySchema = recordDescriptor.getKeySchema(); valueSchema = recordDescriptor.getValueSchema(); - flushed.addAll(flush()); + flushed = flush(); + isSchemaChanged = true; } records.add(recordDescriptor); + if (isSchemaChanged) { + // current record is already added in internal buffer after flush + // just return the flushed buffer ignoring buffer size check + return flushed; + } + + if (records.size() >= connectorConfig.getBatchSize()) { - flushed.addAll(flush()); + flushed = flush(); } return flushed; @@ -54,7 +63,7 @@ public List add(SinkRecordDescriptor recordDescriptor) { public List flush() { - ArrayList flushed = new ArrayList<>(records); + List flushed = new ArrayList<>(records); records.clear(); return flushed; diff --git a/src/main/java/io/debezium/connector/jdbc/ReducedRecordBuffer.java b/src/main/java/io/debezium/connector/jdbc/ReducedRecordBuffer.java index d6ef5c1a..26ce03fa 100644 --- a/src/main/java/io/debezium/connector/jdbc/ReducedRecordBuffer.java +++ b/src/main/java/io/debezium/connector/jdbc/ReducedRecordBuffer.java @@ -35,7 +35,8 @@ public ReducedRecordBuffer(JdbcSinkConnectorConfig connectorConfig) { @Override public List add(SinkRecordDescriptor recordDescriptor) { - ArrayList flushed = new ArrayList<>(); + List flushed = new ArrayList<>(); + boolean isSchemaChanged = false; if (records.isEmpty()) { keySchema = recordDescriptor.getKeySchema(); @@ -45,7 +46,8 @@ public List add(SinkRecordDescriptor recordDescriptor) { if (!Objects.equals(keySchema, recordDescriptor.getKeySchema()) || !Objects.equals(valueSchema, recordDescriptor.getValueSchema())) { keySchema = recordDescriptor.getKeySchema(); valueSchema = recordDescriptor.getValueSchema(); - flushed.addAll(flush()); + flushed = flush(); + isSchemaChanged = true; } Struct keyStruct = recordDescriptor.getKeyStruct(connectorConfig.getPrimaryKeyMode()); @@ -56,8 +58,14 @@ public List add(SinkRecordDescriptor recordDescriptor) { throw new ConnectException("No struct-based primary key defined for record key/value, reduction buffer require struct based primary key"); } + if (isSchemaChanged) { + // current record is already added in internal buffer after flush, + // just return the flushed buffer ignoring buffer size check + return flushed; + } + if (records.size() >= connectorConfig.getBatchSize()) { - flushed.addAll(flush()); + return flush(); } return flushed; @@ -65,7 +73,7 @@ public List add(SinkRecordDescriptor recordDescriptor) { @Override public List flush() { - ArrayList flushed = new ArrayList<>(records.values()); + List flushed = new ArrayList<>(records.values()); records.clear(); return flushed; }