Skip to content

Commit

Permalink
[Db analytics] : add message for data type serialization error (#36981)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Apr 10, 2024
1 parent c7e5d20 commit 0296c43
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 16 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.29.13 | 2024-04-10 | [\#36981](https://github.com/airbytehq/airbyte/pull/36981) | DB sources : Emit analytics for data type serialization errors. |
| 0.29.11 | 2024-04-10 | [\#36865](https://github.com/airbytehq/airbyte/pull/36865) | Sources : Remove noisy log line. |
| 0.29.10 | 2024-04-10 | [\#36805](https://github.com/airbytehq/airbyte/pull/36805) | Destinations: Enhance CatalogParser name collision handling; add DV2 tests for long identifiers |
| 0.29.9 | 2024-04-09 | [\#36047](https://github.com/airbytehq/airbyte/pull/36047) | Destinations: CDK updates for raw-only destinations |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@ import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage
*/
object DbAnalyticsUtils {
const val CDC_CURSOR_INVALID_KEY: String = "db-sources-cdc-cursor-invalid"
const val DATA_TYPES_SERIALIZATION_ERROR_KEY = "db-sources-data-serialization-error"

@JvmStatic
fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage {
return AirbyteAnalyticsTraceMessage().withType(CDC_CURSOR_INVALID_KEY).withValue("1")
}

@JvmStatic
fun dataTypesSerializationErrorMessage(): AirbyteAnalyticsTraceMessage {
return AirbyteAnalyticsTraceMessage()
.withType(DATA_TYPES_SERIALIZATION_ERROR_KEY)
.withValue("1")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.db.DataTypeUtils
import io.airbyte.cdk.db.DbAnalyticsUtils.dataTypesSerializationErrorMessage
import io.airbyte.cdk.db.JdbcCompatibleSourceOperations
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations.Companion.LOGGER
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
Expand Down Expand Up @@ -40,18 +41,25 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :

for (i in 1..columnCount) {
val columnName = queryContext.metaData.getColumnName(i)
val columnTypeName = queryContext.metaData.getColumnTypeName(i)
try {
// convert to java types that will convert into reasonable json.
copyToJsonField(queryContext, i, jsonNode)
} catch (e: java.lang.Exception) {
LOGGER.info("Failed to serialize column: {}, with error {}", columnName, e.message)
LOGGER.info(
"Failed to serialize column: {}, of type {}, with error {}",
columnName,
columnTypeName,
e.message
)
AirbyteTraceMessageUtility.emitAnalyticsTrace(dataTypesSerializationErrorMessage())
metaChanges.add(
AirbyteRecordMessageMetaChange()
.withField(columnName)
.withChange(AirbyteRecordMessageMetaChange.Change.NULLED)
.withReason(
AirbyteRecordMessageMetaChange.Reason.SOURCE_SERIALIZATION_ERROR
)
AirbyteRecordMessageMetaChange.Reason.SOURCE_SERIALIZATION_ERROR,
),
)
}
}
Expand Down Expand Up @@ -166,8 +174,8 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
columnName,
DataTypeUtils.returnNullIfInvalid(
{ resultSet.getDouble(index) },
{ d: Double? -> java.lang.Double.isFinite(d!!) }
)
{ d: Double? -> java.lang.Double.isFinite(d!!) },
),
)
}

Expand All @@ -182,8 +190,8 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
columnName,
DataTypeUtils.returnNullIfInvalid(
{ resultSet.getFloat(index) },
{ f: Float? -> java.lang.Float.isFinite(f!!) }
)
{ f: Float? -> java.lang.Float.isFinite(f!!) },
),
)
}

Expand Down Expand Up @@ -226,7 +234,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
) {
node.put(
columnName,
DateTimeConverter.convertToTime(getObject(resultSet, index, LocalTime::class.java))
DateTimeConverter.convertToTime(getObject(resultSet, index, LocalTime::class.java)),
)
}

Expand All @@ -241,8 +249,8 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
node.put(
columnName,
DateTimeConverter.convertToTimestamp(
getObject(resultSet, index, LocalDateTime::class.java)
)
getObject(resultSet, index, LocalDateTime::class.java),
),
)
} catch (e: Exception) {
// for backward compatibility
Expand Down Expand Up @@ -450,7 +458,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
val localDate = timestamptz.toLocalDate()
node.put(
columnName,
resolveEra(localDate, timestamptz.format(DataTypeUtils.TIMESTAMPTZ_FORMATTER))
resolveEra(localDate, timestamptz.format(DataTypeUtils.TIMESTAMPTZ_FORMATTER)),
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.29.11
version=0.29.13
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.29.11'
cdkVersionRequired = '0.29.13'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.24
dockerImageTag: 3.3.25
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

package io.airbyte.integrations.source.postgres.ctid;

import static io.airbyte.cdk.db.DbAnalyticsUtils.dataTypesSerializationErrorMessage;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.PostgresSourceOperations;
import io.airbyte.integrations.source.postgres.cdc.PostgresCdcConnectorMetadataInjector;
Expand Down Expand Up @@ -45,6 +48,7 @@ public RowDataWithCtid recordWithCtid(final ResultSet queryContext) throws SQLEx
final List<AirbyteRecordMessageMetaChange> metaChanges = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
final String columnName = metadata.getColumnName(i);
final String columnTypeName = metadata.getColumnTypeName(i);
try {
if (columnName.equalsIgnoreCase(CTID)) {
ctid = queryContext.getString(i);
Expand All @@ -54,7 +58,8 @@ public RowDataWithCtid recordWithCtid(final ResultSet queryContext) throws SQLEx
// convert to java types that will convert into reasonable json.
copyToJsonField(queryContext, i, jsonNode);
} catch (Exception e) {
LOGGER.info("Failed to serialize column: {}, with error {}", columnName, e.getMessage());
LOGGER.info("Failed to serialize column: {}, of type {}, with error {}", columnName, columnTypeName, e.getMessage());
AirbyteTraceMessageUtility.emitAnalyticsTrace(dataTypesSerializationErrorMessage());
metaChanges.add(
new AirbyteRecordMessageMetaChange()
.withField(columnName)
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.3.25 | 2024-04-10 | [36981](https://github.com/airbytehq/airbyte/pull/36981) | Track latest CDK |
| 3.3.24 | 2024-04-10 | [36865](https://github.com/airbytehq/airbyte/pull/36865) | Track latest CDK |
| 3.3.23 | 2024-04-02 | [36759](https://github.com/airbytehq/airbyte/pull/36759) | Track latest CDK |
| 3.3.22 | 2024-04-01 | [36739](https://github.com/airbytehq/airbyte/pull/36739) | Fix useLocalCdk flag. |
Expand Down

0 comments on commit 0296c43

Please sign in to comment.