From a2756114cfcd4bcbae9624578284209963d73dfa Mon Sep 17 00:00:00 2001 From: nguyenaiden Date: Wed, 16 Aug 2023 15:44:23 -0700 Subject: [PATCH] Renamed Target position methods combined conditionals for cleanliness --- .../integrations/debezium/CdcTargetPosition.java | 2 +- .../internals/DebeziumStateDecoratingIterator.java | 2 +- .../internals/mysql/MySqlCdcTargetPosition.java | 13 +++---------- .../postgres/PostgresCdcTargetPosition.java | 2 +- 4 files changed, 6 insertions(+), 13 deletions(-) diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java index ca2880649e79..12c53c0ad641 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java @@ -61,7 +61,7 @@ default boolean isHeartbeatSupported() { * @param event Event from the CDC load * @return Returns `true` when the record is behind the offset. Otherwise, it returns `false` */ - default boolean isRecordBehindOffset(final Map offset, final ChangeEventWithMetadata event) { + default boolean isEventAheadOffset(final Map offset, final ChangeEventWithMetadata event) { return false; } diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumStateDecoratingIterator.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumStateDecoratingIterator.java index d46fbeef925f..aa689edd4be0 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumStateDecoratingIterator.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumStateDecoratingIterator.java @@ -162,7 +162,7 @@ protected AirbyteMessage computeNext() { if (checkpointOffsetToSend.size() == 1 && changeEventIterator.hasNext() && !event.isSnapshotEvent() - && targetPosition.isRecordBehindOffset(checkpointOffsetToSend, event)) { + && targetPosition.isEventAheadOffset(checkpointOffsetToSend, event)) { sendCheckpointMessage = true; } } diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mysql/MySqlCdcTargetPosition.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mysql/MySqlCdcTargetPosition.java index cfa2f3a2e636..3fed0293920b 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mysql/MySqlCdcTargetPosition.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mysql/MySqlCdcTargetPosition.java @@ -105,7 +105,7 @@ public boolean isHeartbeatSupported() { } @Override - public boolean isRecordBehindOffset(final Map offset, final ChangeEventWithMetadata event) { + public boolean isEventAheadOffset(final Map offset, final ChangeEventWithMetadata event) { if (offset.size() != 1) { return false; } @@ -126,10 +126,7 @@ public boolean isRecordBehindOffset(final Map offset, final Chan @Override public boolean isSameOffset(final Map offsetA, final Map offsetB) { - if (offsetA == null || offsetA.size() != 1) { - return false; - } - if (offsetB == null || offsetB.size() != 1) { + if ((offsetA == null || offsetA.size() != 1) || (offsetB == null || offsetB.size() != 1)) { return false; } @@ -141,11 +138,7 @@ public boolean isSameOffset(final Map offsetA, final Map sourceOffset } @Override - public boolean isRecordBehindOffset(final Map offset, final ChangeEventWithMetadata event) { + public boolean isEventAheadOffset(final Map offset, final ChangeEventWithMetadata event) { if (offset.size() != 1) { return false; }