Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-7845 [CHORE] modify resolveBuffer method for pkey check
Browse files Browse the repository at this point in the history
  • Loading branch information
gaurav7261 authored and jpechane committed May 14, 2024
1 parent 8267118 commit 0b03ee6
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void execute(Collection<SinkRecord> records) {
flushBuffer(tableId, updateBufferByTable.get(tableId).flush());
}

Buffer tableIdBuffer = resolveBuffer(deleteBufferByTable, tableId);
Buffer tableIdBuffer = resolveBuffer(deleteBufferByTable, tableId, sinkRecordDescriptor);

List<SinkRecordDescriptor> toFlush = tableIdBuffer.add(sinkRecordDescriptor);

Expand All @@ -145,7 +145,7 @@ public void execute(Collection<SinkRecord> records) {
Stopwatch updateBufferStopwatch = Stopwatch.reusable();
updateBufferStopwatch.start();

Buffer tableIdBuffer = resolveBuffer(updateBufferByTable, tableId);
Buffer tableIdBuffer = resolveBuffer(updateBufferByTable, tableId, sinkRecordDescriptor);

List<SinkRecordDescriptor> toFlush = tableIdBuffer.add(sinkRecordDescriptor);
updateBufferStopwatch.stop();
Expand Down Expand Up @@ -175,8 +175,8 @@ private static boolean isSchemaChange(SinkRecord record) {
&& record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE);
}

private Buffer resolveBuffer(Map<TableId, Buffer> bufferMap, TableId tableId) {
if (config.isUseReductionBuffer()) {
private Buffer resolveBuffer(Map<TableId, Buffer> bufferMap, TableId tableId, SinkRecordDescriptor sinkRecordDescriptor) {
if (config.isUseReductionBuffer() && !sinkRecordDescriptor.getKeyFieldNames().isEmpty()) {
return bufferMap.computeIfAbsent(tableId, k -> new ReducedRecordBuffer(config));
}
else {
Expand Down

0 comments on commit 0b03ee6

Please sign in to comment.