Skip to content

Commit

Permalink
[Source-mysql] : Add meta error handling in initial load path (#37328)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Apr 15, 2024
1 parent ca394d2 commit cb9eae3
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 16 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.3.18
dockerImageTag: 3.3.19
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.mysql.cj.result.Field;
import io.airbyte.cdk.db.SourceOperations;
import io.airbyte.cdk.db.jdbc.AbstractJdbcCompatibleSourceOperations;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import io.airbyte.integrations.source.mysql.initialsync.CdcMetadataInjector;
import io.airbyte.protocol.models.JsonSchemaType;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -81,13 +82,14 @@ public MySqlSourceOperations(final Optional<CdcMetadataInjector> metadataInjecto
}

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
final ObjectNode jsonNode = (ObjectNode) super.rowToJson(queryContext);
public AirbyteRecordData convertDatabaseRowToAirbyteRecordData(final ResultSet queryContext) throws SQLException {
final AirbyteRecordData recordData = super.convertDatabaseRowToAirbyteRecordData(queryContext);
final ObjectNode jsonNode = (ObjectNode) recordData.rawRowData();
if (!metadataInjector.isPresent()) {
return jsonNode;
return recordData;
}
metadataInjector.get().inject(jsonNode);
return jsonNode;
return new AirbyteRecordData(jsonNode, recordData.meta());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.mysql.cj.MysqlType;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants;
import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil;
Expand All @@ -27,6 +28,7 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -110,7 +112,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
}
});

final AutoCloseableIterator<JsonNode> queryStream =
final AutoCloseableIterator<AirbyteRecordData> queryStream =
new MySqlInitialLoadRecordIterator(database, sourceOperations, quoteString, initialLoadStateManager, selectedDatabaseFields, pair,
calculateChunkSize(tableSizeInfoMap.get(pair), pair), isCompositePrimaryKey(airbyteStream));
final AutoCloseableIterator<AirbyteMessage> recordIterator =
Expand Down Expand Up @@ -144,7 +146,7 @@ public static long calculateChunkSize(final TableSizeInfo tableSizeInfo, final A

// Transforms the given iterator to create an {@link AirbyteRecordMessage}
private AutoCloseableIterator<AirbyteMessage> getRecordIterator(
final AutoCloseableIterator<JsonNode> recordIterator,
final AutoCloseableIterator<AirbyteRecordData> recordIterator,
final String streamName,
final String namespace,
final long emittedAt) {
Expand All @@ -154,7 +156,12 @@ private AutoCloseableIterator<AirbyteMessage> getRecordIterator(
.withStream(streamName)
.withNamespace(namespace)
.withEmittedAt(emittedAt)
.withData(r)));
.withData(r.rawRowData())
.withMeta(isMetaChangesEmptyOrNull(r.meta()) ? null : r.meta())));
}

private boolean isMetaChangesEmptyOrNull(AirbyteRecordMessageMeta meta) {
return meta == null || meta.getChanges() == null || meta.getChanges().isEmpty();
}

// Augments the given iterator with record count logs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

package io.airbyte.integrations.source.mysql.initialsync;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import com.mysql.cj.MysqlType;
import io.airbyte.cdk.db.JdbcCompatibleSourceOperations;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils;
import io.airbyte.commons.util.AutoCloseableIterator;
Expand Down Expand Up @@ -37,8 +37,8 @@
* records processed here.
*/
@SuppressWarnings("try")
public class MySqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>
implements AutoCloseableIterator<JsonNode> {
public class MySqlInitialLoadRecordIterator extends AbstractIterator<AirbyteRecordData>
implements AutoCloseableIterator<AirbyteRecordData> {

private static final Logger LOGGER = LoggerFactory.getLogger(MySqlInitialLoadRecordIterator.class);

Expand All @@ -54,7 +54,7 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>
private final PrimaryKeyInfo pkInfo;
private final boolean isCompositeKeyLoad;
private int numSubqueries = 0;
private AutoCloseableIterator<JsonNode> currentIterator;
private AutoCloseableIterator<AirbyteRecordData> currentIterator;

MySqlInitialLoadRecordIterator(
final JdbcDatabase database,
Expand All @@ -78,7 +78,7 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>

@CheckForNull
@Override
protected JsonNode computeNext() {
protected AirbyteRecordData computeNext() {
if (shouldBuildNextSubquery()) {
try {
// We will only issue one query for a composite key load. If we have already processed all the data
Expand All @@ -93,8 +93,8 @@ protected JsonNode computeNext() {
}

LOGGER.info("Subquery number : {}", numSubqueries);
final Stream<JsonNode> stream = database.unsafeQuery(
this::getPkPreparedStatement, sourceOperations::rowToJson);
final Stream<AirbyteRecordData> stream = database.unsafeQuery(
this::getPkPreparedStatement, sourceOperations::convertDatabaseRowToAirbyteRecordData);

currentIterator = AutoCloseableIterators.fromStream(stream, pair);
numSubqueries++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -50,6 +51,10 @@
import io.airbyte.protocol.models.v0.AirbyteGlobalState;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStream;
Expand All @@ -59,6 +64,7 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand All @@ -80,6 +86,11 @@ public class CdcMysqlSourceTest extends CdcSourceTest<MySqlSource, MySQLTestData

private static final Random RANDOM = new Random();

private static final String TEST_DATE_STREAM_NAME = "TEST_DATE_TABLE";
private static final String COL_DATE_TIME = "CAR_DATE";
private static final List<JsonNode> DATE_TIME_RECORDS = ImmutableList.of(
Jsons.jsonNode(ImmutableMap.of(COL_ID, 120, COL_DATE_TIME, "'2023-00-00 20:37:47'")));

@Override
protected MySQLTestDatabase createTestDatabase() {
return MySQLTestDatabase.in(BaseImage.MYSQL_8, ContainerModifier.INVALID_TIMEZONE_CEST).withCdcPermissions();
Expand Down Expand Up @@ -734,6 +745,70 @@ public void testCompressedSchemaHistory() throws Exception {
assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size());
}

private void writeDateRecords(
final JsonNode recordJson,
final String dbName,
final String streamName,
final String idCol,
final String dateCol) {
testdb.with("INSERT INTO `%s` .`%s` (%s, %s) VALUES (%s, %s);", dbName, streamName,
idCol, dateCol,
recordJson.get(idCol).asInt(), recordJson.get(dateCol).asText());
}

@Test
public void testInvalidDatetime_metaChangesPopulated() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(getConfiguredCatalog());

// Add a datetime stream to the catalog
testdb
.withoutStrictMode()
.with(createTableSqlFmt(), getDatabaseName(), TEST_DATE_STREAM_NAME,
columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_DATE_TIME, "DATETIME"), Optional.of(COL_ID)));

for (final JsonNode recordJson : DATE_TIME_RECORDS) {
writeDateRecords(recordJson, getDatabaseName(), TEST_DATE_STREAM_NAME, COL_ID, COL_DATE_TIME);
}

final ConfiguredAirbyteStream airbyteStream = new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(
TEST_DATE_STREAM_NAME,
getDatabaseName(),
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_DATE_TIME, JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))));
airbyteStream.setSyncMode(SyncMode.INCREMENTAL);

final List<ConfiguredAirbyteStream> streams = new ArrayList<>();
streams.add(airbyteStream);
configuredCatalog.withStreams(streams);

final AutoCloseableIterator<AirbyteMessage> read1 = source()
.read(config(), configuredCatalog, null);
final List<AirbyteMessage> actualRecords = AutoCloseableIterators.toListAndClose(read1);

// Sync is expected to succeed with one record. However, the meta changes column should be populated
// for this record
// as it is an invalid date. As a result, this field will be omitted as Airbyte is unable to
// serialize the source value.
final Set<AirbyteRecordMessage> recordMessages = extractRecordMessages(actualRecords);
assertEquals(recordMessages.size(), 1);
final AirbyteRecordMessage invalidDateRecord = recordMessages.stream().findFirst().get();

final AirbyteRecordMessageMetaChange expectedChange =
new AirbyteRecordMessageMetaChange().withReason(Reason.SOURCE_SERIALIZATION_ERROR).withChange(
Change.NULLED).withField(COL_DATE_TIME);
final AirbyteRecordMessageMeta expectedMessageMeta = new AirbyteRecordMessageMeta().withChanges(List.of(expectedChange));
assertEquals(expectedMessageMeta, invalidDateRecord.getMeta());

ObjectMapper mapper = new ObjectMapper();
final JsonNode expectedDataWithoutCdcFields = mapper.readTree("{\"id\":120}");
removeCDCColumns((ObjectNode) invalidDateRecord.getData());
assertEquals(expectedDataWithoutCdcFields, invalidDateRecord.getData());
}

private void createTablesToIncreaseSchemaHistorySize() {
for (int i = 0; i <= 200; i++) {
final String tableName = generateRandomStringOf32Characters();
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.3.18 | 2024-04-15 | [36919](https://github.com/airbytehq/airbyte/pull/36919) | Refactor source operations. |
| 3.3.19 | 2024-04-15 | [37328](https://github.com/airbytehq/airbyte/pull/37328) | Populate airbyte_meta.changes |
| 3.3.18 | 2024-04-15 | [37324](https://github.com/airbytehq/airbyte/pull/37324) | Refactor source operations. |
| 3.3.17 | 2024-04-10 | [36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix a bug in conversion of null values. |
| 3.3.16 | 2024-04-05 | [36872](https://github.com/airbytehq/airbyte/pull/36872) | Update to connector's metadat definition. |
| 3.3.15 | 2024-04-05 | [36577](https://github.com/airbytehq/airbyte/pull/36577) | Config error will not send out system trace message |
Expand Down

0 comments on commit cb9eae3

Please sign in to comment.