diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mongodb/MongodbDebeziumConstants.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mongodb/MongoDbDebeziumConstants.java similarity index 94% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mongodb/MongodbDebeziumConstants.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mongodb/MongoDbDebeziumConstants.java index e457574f00a7..5858c77624a8 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mongodb/MongodbDebeziumConstants.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mongodb/MongoDbDebeziumConstants.java @@ -7,7 +7,7 @@ /** * A collection of constants for use with the Debezium MongoDB Connector. */ -public class MongodbDebeziumConstants { +public class MongoDbDebeziumConstants { /** * Constants for Debezium Offset State storage. diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mongodb/MongodbDebeziumStateUtil.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java similarity index 86% rename from airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mongodb/MongodbDebeziumStateUtil.java rename to airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java index a39483ccd7ce..05f0df6f4a56 100644 --- a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mongodb/MongodbDebeziumStateUtil.java +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java @@ -23,9 +23,9 @@ /** * Collection of utility methods related to the Debezium offset state. */ -public class MongodbDebeziumStateUtil { +public class MongoDbDebeziumStateUtil { - private static final Logger LOGGER = LoggerFactory.getLogger(MongodbDebeziumStateUtil.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbDebeziumStateUtil.class); /** * Constructs the initial Debezium offset state that will be used by the incremental CDC snapshot @@ -42,14 +42,14 @@ public JsonNode constructInitialDebeziumState(final MongoClient mongoClient, fin final BsonTimestamp timestamp = ResumeTokens.getTimestamp(resumeToken); final List> key = List.of( - Map.of(MongodbDebeziumConstants.OffsetState.KEY_REPLICA_SET, replicaSet, - MongodbDebeziumConstants.OffsetState.KEY_SERVER_ID, database)); + Map.of(MongoDbDebeziumConstants.OffsetState.KEY_REPLICA_SET, replicaSet, + MongoDbDebeziumConstants.OffsetState.KEY_SERVER_ID, database)); final Map value = new HashMap<>(); - value.put(MongodbDebeziumConstants.OffsetState.VALUE_SECONDS, timestamp.getTime()); - value.put(MongodbDebeziumConstants.OffsetState.VALUE_INCREMENT, timestamp.getInc()); - value.put(MongodbDebeziumConstants.OffsetState.VALUE_TRANSACTION_ID, null); - value.put(MongodbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN, resumeTokenData); + value.put(MongoDbDebeziumConstants.OffsetState.VALUE_SECONDS, timestamp.getTime()); + value.put(MongoDbDebeziumConstants.OffsetState.VALUE_INCREMENT, timestamp.getInc()); + value.put(MongoDbDebeziumConstants.OffsetState.VALUE_TRANSACTION_ID, null); + value.put(MongoDbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN, resumeTokenData); final JsonNode state = Jsons.jsonNode(Map.of(key, value)); LOGGER.info("Initial Debezium state constructed: {}", state); diff --git a/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/mongodb/MongodbDebeziumStateUtilTest.java b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtilTest.java similarity index 82% rename from airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/mongodb/MongodbDebeziumStateUtilTest.java rename to airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtilTest.java index b84e8d31e521..90e71344909f 100644 --- a/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/mongodb/MongodbDebeziumStateUtilTest.java +++ b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtilTest.java @@ -21,13 +21,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -class MongodbDebeziumStateUtilTest { +class MongoDbDebeziumStateUtilTest { - private MongodbDebeziumStateUtil mongodbDebeziumStateUtil; + private MongoDbDebeziumStateUtil mongoDbDebeziumStateUtil; @BeforeEach void setup() { - mongodbDebeziumStateUtil = new MongodbDebeziumStateUtil(); + mongoDbDebeziumStateUtil = new MongoDbDebeziumStateUtil(); } @Test @@ -47,17 +47,17 @@ void testConstructInitialDebeziumState() { when(changeStreamIterable.cursor()).thenReturn(mongoChangeStreamCursor); when(mongoClient.watch(BsonDocument.class)).thenReturn(changeStreamIterable); - final JsonNode initialState = mongodbDebeziumStateUtil.constructInitialDebeziumState(mongoClient, + final JsonNode initialState = mongoDbDebeziumStateUtil.constructInitialDebeziumState(mongoClient, database, replicaSet); assertNotNull(initialState); assertEquals(1, initialState.size()); final BsonTimestamp timestamp = ResumeTokens.getTimestamp(resumeTokenDocument); final JsonNode offsetState = initialState.fields().next().getValue(); - assertEquals(resumeToken, offsetState.get(MongodbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN).asText()); - assertEquals(timestamp.getTime(), offsetState.get(MongodbDebeziumConstants.OffsetState.VALUE_SECONDS).asInt()); - assertEquals(timestamp.getInc(), offsetState.get(MongodbDebeziumConstants.OffsetState.VALUE_INCREMENT).asInt()); - assertEquals("null", offsetState.get(MongodbDebeziumConstants.OffsetState.VALUE_TRANSACTION_ID).asText()); + assertEquals(resumeToken, offsetState.get(MongoDbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN).asText()); + assertEquals(timestamp.getTime(), offsetState.get(MongoDbDebeziumConstants.OffsetState.VALUE_SECONDS).asInt()); + assertEquals(timestamp.getInc(), offsetState.get(MongoDbDebeziumConstants.OffsetState.VALUE_INCREMENT).asInt()); + assertEquals("null", offsetState.get(MongoDbDebeziumConstants.OffsetState.VALUE_TRANSACTION_ID).asText()); } }