Skip to content

Commit

Permalink
[WASS] : Create a transient error emitter (#39445)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Jun 13, 2024
1 parent 8443138 commit 0759c84
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 2 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.38.1 | 2024-06-13 | [\#39445](https://github.com/airbytehq/airbyte/pull/39445) | Sources: More CDK changes to handle big initial snapshots. |
| 0.38.0 | 2024-06-11 | [\#39405](https://github.com/airbytehq/airbyte/pull/39405) | Sources: Debezium properties manager interface changed to accept a list of streams to scope to |
| 0.37.1 | 2024-06-10 | [\#38075](https://github.com/airbytehq/airbyte/pull/38075) | Destinations: Track stream statuses in async framework |
| 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage
object DbAnalyticsUtils {
const val CDC_CURSOR_INVALID_KEY: String = "db-sources-cdc-cursor-invalid"
const val DATA_TYPES_SERIALIZATION_ERROR_KEY = "db-sources-data-serialization-error"
const val CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY = "db-sources-snapshot-force-shutdown"

@JvmStatic
fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage {
Expand All @@ -25,4 +26,11 @@ object DbAnalyticsUtils {
.withType(DATA_TYPES_SERIALIZATION_ERROR_KEY)
.withValue("1")
}

@JvmStatic
fun cdcSnapshotForceShutdownMessage(): AirbyteAnalyticsTraceMessage {
return AirbyteAnalyticsTraceMessage()
.withType(CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY)
.withValue("1")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object AirbyteTraceMessageUtility {
outputRecordCollector.accept(message)
}

private fun makeErrorTraceAirbyteMessage(
fun makeErrorTraceAirbyteMessage(
e: Throwable,
displayMessage: String?,
failureType: AirbyteErrorTraceMessage.FailureType
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.38.0
version=0.38.1
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.source.relationaldb.streamstatus

import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility.makeErrorTraceAirbyteMessage
import io.airbyte.commons.util.AutoCloseableIterator
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage
import io.airbyte.protocol.models.v0.AirbyteMessage
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class TransientErrorTraceEmitterIterator(private val e: Throwable) :
AutoCloseableIterator<AirbyteMessage?> {
private var emitted = false

override fun hasNext(): Boolean {
return !emitted
}

override fun next(): AirbyteMessage {
emitted = true
return makeErrorTraceAirbyteMessage(
e,
e.message,
AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR
)
}

@Throws(Exception::class)
override fun close() {
// no-op
}

override fun remove() {
// no-op
}

companion object {
private val LOGGER: Logger =
LoggerFactory.getLogger(TransientErrorTraceEmitterIterator::class.java)
}
}

0 comments on commit 0759c84

Please sign in to comment.