From 711dfaaf45d490d8d73ed0dfd860888a1fb11c4d Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Wed, 30 Oct 2024 20:16:33 -0700 Subject: [PATCH] bulk-cdk-toolkit-extract-cdc: tweak DebeziumOperations interface (#48011) --- .../cdk/read/cdc/CdcPartitionReader.kt | 86 +++++++++++++------ .../cdk/read/cdc/CdcPartitionsCreator.kt | 15 +++- .../read/cdc/CdcPartitionsCreatorFactory.kt | 8 ++ .../io/airbyte/cdk/read/cdc/Debezium.kt | 5 +- .../cdk/read/cdc/DebeziumOperations.kt | 8 +- .../cdk/read/cdc/CdcPartitionsCreatorTest.kt | 2 + 6 files changed, 91 insertions(+), 33 deletions(-) diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReader.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReader.kt index a2896a4bf7af..61d579c3aa24 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReader.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReader.kt @@ -88,7 +88,14 @@ class CdcPartitionReader>( val thread = Thread(engine, "debezium-engine") thread.setUncaughtExceptionHandler { _, e: Throwable -> engineException.set(e) } thread.start() - withContext(Dispatchers.IO) { thread.join() } + try { + withContext(Dispatchers.IO) { thread.join() } + } catch (e: Throwable) { + // This catches any exceptions thrown by join() + // but also by the kotlin coroutine dispatcher, like TimeoutCancellationException. + engineException.compareAndSet(null, e) + } + // Print a nice log message and re-throw any exception. val exception: Throwable? = engineException.get() val summary: Map = mapOf( @@ -138,49 +145,64 @@ class CdcPartitionReader>( val debeziumRecordValue: DebeziumRecordValue? = event.value()?.let { DebeziumRecordValue(Jsons.readTree(it)) } // Process records, ignoring heartbeats which are only used for completion checks. - val isRecord: Boolean - if (debeziumRecordValue == null) { - isRecord = false - numTombstones.incrementAndGet() - } else if (debeziumRecordValue.isHeartbeat) { - isRecord = false - numHeartbeats.incrementAndGet() - } else { - isRecord = true + val eventType: EventType = run { + if (debeziumRecordValue == null) return@run EventType.TOMBSTONE + if (debeziumRecordValue.isHeartbeat) return@run EventType.HEARTBEAT val debeziumRecordKey = DebeziumRecordKey(Jsons.readTree(event.key())) val deserializedRecord: DeserializedRecord = readerOps.deserialize(debeziumRecordKey, debeziumRecordValue) - val streamRecordConsumer: StreamRecordConsumer? = + ?: return@run EventType.RECORD_DISCARDED_BY_DESERIALIZE + val streamRecordConsumer: StreamRecordConsumer = streamRecordConsumers[deserializedRecord.streamID] - if (streamRecordConsumer == null) { - numDiscardedRecords.incrementAndGet() - } else { - streamRecordConsumer.accept(deserializedRecord.data, deserializedRecord.changes) - numEmittedRecords.incrementAndGet() - } + ?: return@run EventType.RECORD_DISCARDED_BY_STREAM_ID + streamRecordConsumer.accept(deserializedRecord.data, deserializedRecord.changes) + return@run EventType.RECORD_EMITTED } + // Update counters. + when (eventType) { + EventType.TOMBSTONE -> numTombstones + EventType.HEARTBEAT -> numHeartbeats + EventType.RECORD_DISCARDED_BY_DESERIALIZE, + EventType.RECORD_DISCARDED_BY_STREAM_ID -> numDiscardedRecords + EventType.RECORD_EMITTED -> numEmittedRecords + }.incrementAndGet() // Look for reasons to close down the engine. - val closeReason: CloseReason? = run { + val closeReason: CloseReason = run { + if (input.isSynthetic && eventType != EventType.HEARTBEAT) { + // Special case where the engine started with a synthetic offset: + // don't even consider closing the engine unless handling a heartbeat event. + // For some databases, such as Oracle, Debezium actually needs to snapshot the + // schema in order to collect the database schema history and there's no point + // in interrupting it until the snapshot is done. + return + } if (!coroutineContext.isActive) { return@run CloseReason.TIMEOUT } val currentPosition: T? = position(sourceRecord) ?: position(debeziumRecordValue) if (currentPosition == null || currentPosition < upperBound) { - return@run null + return } // Close because the current event is past the sync upper bound. - if (isRecord) { - CloseReason.RECORD_REACHED_TARGET_POSITION - } else { - CloseReason.HEARTBEAT_OR_TOMBSTONE_REACHED_TARGET_POSITION + when (eventType) { + EventType.TOMBSTONE, + EventType.HEARTBEAT -> + CloseReason.HEARTBEAT_OR_TOMBSTONE_REACHED_TARGET_POSITION + EventType.RECORD_EMITTED, + EventType.RECORD_DISCARDED_BY_DESERIALIZE, + EventType.RECORD_DISCARDED_BY_STREAM_ID -> + CloseReason.RECORD_REACHED_TARGET_POSITION } } - // Idempotent engine shutdown. - if (closeReason != null && closeReasonReference.compareAndSet(null, closeReason)) { - log.info { "Shutting down Debezium engine: ${closeReason.message}." } - // TODO : send close analytics message - Thread({ engine.close() }, "debezium-close").start() + // At this point, if we haven't returned already, we want to close down the engine. + if (!closeReasonReference.compareAndSet(null, closeReason)) { + // An earlier event has already triggered closing down the engine, do nothing. + return } + // At this point, if we haven't returned already, we need to close down the engine. + log.info { "Shutting down Debezium engine: ${closeReason.message}." } + // TODO : send close analytics message + Thread({ engine.close() }, "debezium-close").start() } private fun position(sourceRecord: SourceRecord?): T? { @@ -204,6 +226,14 @@ class CdcPartitionReader>( } } + private enum class EventType { + TOMBSTONE, + HEARTBEAT, + RECORD_DISCARDED_BY_DESERIALIZE, + RECORD_DISCARDED_BY_STREAM_ID, + RECORD_EMITTED, + } + inner class CompletionCallback : DebeziumEngine.CompletionCallback { override fun handle(success: Boolean, message: String?, error: Throwable?) { if (success) { diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt index 4ec20217d07b..0806d68e3d43 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt @@ -20,6 +20,7 @@ class CdcPartitionsCreator>( val feedBootstrap: GlobalFeedBootstrap, val creatorOps: CdcPartitionsCreatorDebeziumOperations, val readerOps: CdcPartitionReaderDebeziumOperations, + val lowerBoundReference: AtomicReference, val upperBoundReference: AtomicReference, ) : PartitionsCreator { private val log = KotlinLogging.logger {} @@ -72,20 +73,30 @@ class CdcPartitionsCreator>( upperBound, input ) + val lowerBound: T = creatorOps.position(input.state.offset) + val lowerBoundInPreviousRound: T? = lowerBoundReference.getAndSet(lowerBound) if (input.isSynthetic) { // Handle synthetic offset edge-case, which always needs to run. // Debezium needs to run to generate the full state, which might include schema history. log.info { "Current offset is synthetic." } return listOf(partitionReader) } - val lowerBound: T = creatorOps.position(input.state.offset) if (upperBound <= lowerBound) { // Handle completion due to reaching the WAL position upper bound. log.info { "Current position '$lowerBound' equals or exceeds target position '$upperBound'." } globalLockResource.markCdcAsComplete() - return listOf() + return emptyList() + } + if (lowerBoundInPreviousRound != null && lowerBound <= lowerBoundInPreviousRound) { + // Handle completion due to stalling. + log.info { + "Current position '$lowerBound' has not increased in the last round, " + + "prior to which is was '$lowerBoundInPreviousRound'." + } + globalLockResource.markCdcAsComplete() + return emptyList() } // Handle common case. log.info { "Current position '$lowerBound' does not exceed target position '$upperBound'." } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorFactory.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorFactory.kt index c7aa758be6bf..728bb637713e 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorFactory.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorFactory.kt @@ -22,6 +22,13 @@ class CdcPartitionsCreatorFactory>( val debeziumOps: DebeziumOperations, ) : PartitionsCreatorFactory { + /** + * [AtomicReference] to a WAL position lower bound value shared by all [CdcPartitionsCreator]s. + * This value is updated by the [CdcPartitionsCreator] based on the incumbent state and is used + * to detect stalls. + */ + private val lowerBoundReference = AtomicReference() + /** * [AtomicReference] to a WAL position upper bound value shared by all [CdcPartitionsCreator]s. * This value is set exactly once by the first [CdcPartitionsCreator]. @@ -39,6 +46,7 @@ class CdcPartitionsCreatorFactory>( feedBootstrap, debeziumOps, debeziumOps, + lowerBoundReference, upperBoundReference, ) } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/Debezium.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/Debezium.kt index e1693a0d0fc0..dded33e05666 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/Debezium.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/Debezium.kt @@ -25,7 +25,10 @@ value class DebeziumRecordKey(val wrapped: JsonNode) { @JvmInline value class DebeziumRecordValue(val wrapped: JsonNode) { - /** True if this is a Debezium heartbeat event. These aren't passed to [DebeziumConsumer]. */ + /** + * True if this is a Debezium heartbeat event, or the equivalent thereof. In any case, such + * events are only used for their position value and for triggering timeouts. + */ val isHeartbeat: Boolean get() = source.isNull diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/DebeziumOperations.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/DebeziumOperations.kt index 990924bcfcdb..a6126fb73fb7 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/DebeziumOperations.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/DebeziumOperations.kt @@ -30,8 +30,12 @@ interface CdcPartitionsCreatorDebeziumOperations> { interface CdcPartitionReaderDebeziumOperations> { - /** Transforms a [DebeziumRecordValue] into a [DeserializedRecord]. */ - fun deserialize(key: DebeziumRecordKey, value: DebeziumRecordValue): DeserializedRecord + /** + * Transforms a [DebeziumRecordKey] and a [DebeziumRecordValue] into a [DeserializedRecord]. + * + * Returning null means that the event should be treated like a heartbeat. + */ + fun deserialize(key: DebeziumRecordKey, value: DebeziumRecordValue): DeserializedRecord? /** Maps a [DebeziumState] to an [OpaqueStateValue]. */ fun serialize(debeziumState: DebeziumState): OpaqueStateValue diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt index e339e92d84d7..c96d6dbd8a57 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt @@ -58,6 +58,7 @@ class CdcPartitionsCreatorTest { val global = Global(listOf(stream)) + val lowerBoundReference = AtomicReference(null) val upperBoundReference = AtomicReference(null) val creator: CdcPartitionsCreator @@ -68,6 +69,7 @@ class CdcPartitionsCreatorTest { globalFeedBootstrap, creatorOps, readerOps, + lowerBoundReference, upperBoundReference, )