diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index c32f5fa7b7ab..b9b17558c395 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,7 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.38.1 | 2024-06-13 | [\#39445](https://github.com/airbytehq/airbyte/pull/39445) | Sources: More CDK changes to handle big initial snapshots. | | 0.38.0 | 2024-06-11 | [\#39405](https://github.com/airbytehq/airbyte/pull/39405) | Sources: Debezium properties manager interface changed to accept a list of streams to scope to | | 0.37.1 | 2024-06-10 | [\#38075](https://github.com/airbytehq/airbyte/pull/38075) | Destinations: Track stream statuses in async framework | | 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DbAnalyticsUtils.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DbAnalyticsUtils.kt index ffe7897b79e2..75eda5761212 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DbAnalyticsUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DbAnalyticsUtils.kt @@ -13,6 +13,7 @@ import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage object DbAnalyticsUtils { const val CDC_CURSOR_INVALID_KEY: String = "db-sources-cdc-cursor-invalid" const val DATA_TYPES_SERIALIZATION_ERROR_KEY = "db-sources-data-serialization-error" + const val CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY = "db-sources-snapshot-force-shutdown" @JvmStatic fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage { @@ -25,4 +26,11 @@ object DbAnalyticsUtils { .withType(DATA_TYPES_SERIALIZATION_ERROR_KEY) .withValue("1") } + + @JvmStatic + fun cdcSnapshotForceShutdownMessage(): AirbyteAnalyticsTraceMessage { + return AirbyteAnalyticsTraceMessage() + .withType(CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY) + .withValue("1") + } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.kt index e32b6f68d517..98131c98fa0d 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.kt @@ -96,7 +96,7 @@ object AirbyteTraceMessageUtility { outputRecordCollector.accept(message) } - private fun makeErrorTraceAirbyteMessage( + fun makeErrorTraceAirbyteMessage( e: Throwable, displayMessage: String?, failureType: AirbyteErrorTraceMessage.FailureType diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 774491a859f9..69498d318733 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.38.0 \ No newline at end of file +version=0.38.1 \ No newline at end of file diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/streamstatus/TransientErrorTraceEmitterIterator.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/streamstatus/TransientErrorTraceEmitterIterator.kt new file mode 100644 index 000000000000..c0cf94cad743 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/streamstatus/TransientErrorTraceEmitterIterator.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.source.relationaldb.streamstatus + +import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility.makeErrorTraceAirbyteMessage +import io.airbyte.commons.util.AutoCloseableIterator +import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage +import io.airbyte.protocol.models.v0.AirbyteMessage +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +class TransientErrorTraceEmitterIterator(private val e: Throwable) : + AutoCloseableIterator { + private var emitted = false + + override fun hasNext(): Boolean { + return !emitted + } + + override fun next(): AirbyteMessage { + emitted = true + return makeErrorTraceAirbyteMessage( + e, + e.message, + AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR + ) + } + + @Throws(Exception::class) + override fun close() { + // no-op + } + + override fun remove() { + // no-op + } + + companion object { + private val LOGGER: Logger = + LoggerFactory.getLogger(TransientErrorTraceEmitterIterator::class.java) + } +}