Skip to content

Commit

Permalink
Naming consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Aug 21, 2023
1 parent 9d05fe2 commit bbdf7c8
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
/**
* A collection of constants for use with the Debezium MongoDB Connector.
*/
public class MongodbDebeziumConstants {
public class MongoDbDebeziumConstants {

/**
* Constants for Debezium Offset State storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
/**
* Collection of utility methods related to the Debezium offset state.
*/
public class MongodbDebeziumStateUtil {
public class MongoDbDebeziumStateUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(MongodbDebeziumStateUtil.class);
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbDebeziumStateUtil.class);

/**
* Constructs the initial Debezium offset state that will be used by the incremental CDC snapshot
Expand All @@ -42,14 +42,14 @@ public JsonNode constructInitialDebeziumState(final MongoClient mongoClient, fin
final BsonTimestamp timestamp = ResumeTokens.getTimestamp(resumeToken);

final List<Map<String, Object>> key = List.of(
Map.of(MongodbDebeziumConstants.OffsetState.KEY_REPLICA_SET, replicaSet,
MongodbDebeziumConstants.OffsetState.KEY_SERVER_ID, database));
Map.of(MongoDbDebeziumConstants.OffsetState.KEY_REPLICA_SET, replicaSet,
MongoDbDebeziumConstants.OffsetState.KEY_SERVER_ID, database));

final Map<String, Object> value = new HashMap<>();
value.put(MongodbDebeziumConstants.OffsetState.VALUE_SECONDS, timestamp.getTime());
value.put(MongodbDebeziumConstants.OffsetState.VALUE_INCREMENT, timestamp.getInc());
value.put(MongodbDebeziumConstants.OffsetState.VALUE_TRANSACTION_ID, null);
value.put(MongodbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN, resumeTokenData);
value.put(MongoDbDebeziumConstants.OffsetState.VALUE_SECONDS, timestamp.getTime());
value.put(MongoDbDebeziumConstants.OffsetState.VALUE_INCREMENT, timestamp.getInc());
value.put(MongoDbDebeziumConstants.OffsetState.VALUE_TRANSACTION_ID, null);
value.put(MongoDbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN, resumeTokenData);

final JsonNode state = Jsons.jsonNode(Map.of(key, value));
LOGGER.info("Initial Debezium state constructed: {}", state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class MongodbDebeziumStateUtilTest {
class MongoDbDebeziumStateUtilTest {

private MongodbDebeziumStateUtil mongodbDebeziumStateUtil;
private MongoDbDebeziumStateUtil mongoDbDebeziumStateUtil;

@BeforeEach
void setup() {
mongodbDebeziumStateUtil = new MongodbDebeziumStateUtil();
mongoDbDebeziumStateUtil = new MongoDbDebeziumStateUtil();
}

@Test
Expand All @@ -47,17 +47,17 @@ void testConstructInitialDebeziumState() {
when(changeStreamIterable.cursor()).thenReturn(mongoChangeStreamCursor);
when(mongoClient.watch(BsonDocument.class)).thenReturn(changeStreamIterable);

final JsonNode initialState = mongodbDebeziumStateUtil.constructInitialDebeziumState(mongoClient,
final JsonNode initialState = mongoDbDebeziumStateUtil.constructInitialDebeziumState(mongoClient,
database, replicaSet);

assertNotNull(initialState);
assertEquals(1, initialState.size());
final BsonTimestamp timestamp = ResumeTokens.getTimestamp(resumeTokenDocument);
final JsonNode offsetState = initialState.fields().next().getValue();
assertEquals(resumeToken, offsetState.get(MongodbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN).asText());
assertEquals(timestamp.getTime(), offsetState.get(MongodbDebeziumConstants.OffsetState.VALUE_SECONDS).asInt());
assertEquals(timestamp.getInc(), offsetState.get(MongodbDebeziumConstants.OffsetState.VALUE_INCREMENT).asInt());
assertEquals("null", offsetState.get(MongodbDebeziumConstants.OffsetState.VALUE_TRANSACTION_ID).asText());
assertEquals(resumeToken, offsetState.get(MongoDbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN).asText());
assertEquals(timestamp.getTime(), offsetState.get(MongoDbDebeziumConstants.OffsetState.VALUE_SECONDS).asInt());
assertEquals(timestamp.getInc(), offsetState.get(MongoDbDebeziumConstants.OffsetState.VALUE_INCREMENT).asInt());
assertEquals("null", offsetState.get(MongoDbDebeziumConstants.OffsetState.VALUE_TRANSACTION_ID).asText());
}

}

0 comments on commit bbdf7c8

Please sign in to comment.