Skip to content

Commit

Permalink
Enabled logging for MySQL CDC Incremental sync
Browse files Browse the repository at this point in the history
  • Loading branch information
nguyenaiden committed Aug 21, 2023
1 parent 3676485 commit 82756c9
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.debezium.internals.mysql;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.debezium.CdcTargetPosition;
import io.airbyte.integrations.debezium.internals.ChangeEventWithMetadata;
Expand Down Expand Up @@ -102,6 +104,50 @@ public boolean isHeartbeatSupported() {
return true;
}

@Override
public boolean isRecordBehindOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
if (offset.size() != 1) {
return false;
}

final String eventFileName = event.eventValueAsJson().get("source").get("file").asText();
final long eventPosition = event.eventValueAsJson().get("source").get("pos").asLong();

final JsonNode offsetJson = Jsons.deserialize((String) offset.values().toArray()[0]);

final String offsetFileName = offsetJson.get("file").asText();
final long offsetPosition = offsetJson.get("pos").asLong();
if (eventFileName.compareTo(offsetFileName) != 0) {
return eventFileName.compareTo(offsetFileName) > 0;
}

return eventPosition > offsetPosition;
}

@Override
public boolean isSameOffset(final Map<String, String> offsetA, final Map<String, String> offsetB) {
if (offsetA == null || offsetA.size() != 1) {
return false;
}
if (offsetB == null || offsetB.size() != 1) {
return false;
}

final JsonNode offsetJsonA = Jsons.deserialize((String) offsetA.values().toArray()[0]);
final String offsetAFileName = offsetJsonA.get("file").asText();
final long offsetAPosition = offsetJsonA.get("pos").asLong();

final JsonNode offsetJsonB = Jsons.deserialize((String) offsetB.values().toArray()[0]);
final String offsetBFileName = offsetJsonB.get("file").asText();
final long offsetBPosition = offsetJsonB.get("pos").asLong();

if (offsetAFileName.compareTo(offsetBFileName) != 0) {
return false;
}

return offsetAPosition == offsetBPosition;
}

@Override
public MySqlCdcPosition extractPositionFromHeartbeatOffset(final Map<String, ?> sourceOffset) {
return new MySqlCdcPosition(sourceOffset.get("file").toString(), (Long) sourceOffset.get("pos"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public MySqlCdcStateHandler(final StateManager stateManager) {
this.stateManager = stateManager;
}

@Override
public boolean isCdcCheckpointEnabled() {
return true;
}

@Override
public AirbyteMessage saveState(final Map<String, String> offset, final String dbHistory) {
final Map<String, Object> state = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSource.class);
private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000;
private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 1;
public static final String NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY =
"""
SELECT (EXISTS (SELECT * from `%s`.`%s` where `%s` IS NULL LIMIT 1)) AS %s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,47 @@ protected void syncShouldHandlePurgedLogsGracefully() throws Exception {
"Expected 46 records to be replicated in the second sync.");
}

/**
* This test verifies that multiple states are sent during the CDC process based on number of records.
* We can ensure that more than one `STATE` type of message is sent, but we are not able to assert
* the exact number of messages sent as depends on Debezium.
*
* @throws Exception Exception happening in the test.
*/
@Test
protected void verifyCheckpointStatesByRecords() throws Exception {
// We require a huge amount of records, otherwise Debezium will notify directly the last offset.
final int recordsToCreate = 20000;

final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators
.toListAndClose(firstBatchIterator);
final List<AirbyteStateMessage> stateMessages = extractStateMessages(dataFromFirstBatch);

// As first `read` operation is from snapshot, it would generate only one state message at the end
// of the process.
assertExpectedStateMessages(stateMessages);

for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 200 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated));
writeModelRecord(record);
}

final JsonNode stateAfterFirstSync = Jsons.jsonNode(Collections.singletonList(stateMessages.get(stateMessages.size() - 1)));
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, stateAfterFirstSync);
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
.toListAndClose(secondBatchIterator);
assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size());
final List<AirbyteStateMessage> stateMessagesCDC = extractStateMessages(dataFromSecondBatch);
assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state.");
assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states.");
}

protected void assertStateForSyncShouldHandlePurgedLogsGracefully(final List<AirbyteStateMessage> stateMessages, final int syncNumber) {
assertExpectedStateMessages(stateMessages);
}
Expand Down

0 comments on commit 82756c9

Please sign in to comment.