Skip to content

Commit

Permalink
bulk-cdk-toolkit-extract-cdc: tweak DebeziumOperations interface (#48011
Browse files Browse the repository at this point in the history
)
  • Loading branch information
postamar authored Oct 31, 2024
1 parent 08de45a commit 711dfaa
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,14 @@ class CdcPartitionReader<T : Comparable<T>>(
val thread = Thread(engine, "debezium-engine")
thread.setUncaughtExceptionHandler { _, e: Throwable -> engineException.set(e) }
thread.start()
withContext(Dispatchers.IO) { thread.join() }
try {
withContext(Dispatchers.IO) { thread.join() }
} catch (e: Throwable) {
// This catches any exceptions thrown by join()
// but also by the kotlin coroutine dispatcher, like TimeoutCancellationException.
engineException.compareAndSet(null, e)
}
// Print a nice log message and re-throw any exception.
val exception: Throwable? = engineException.get()
val summary: Map<String, Any?> =
mapOf(
Expand Down Expand Up @@ -138,49 +145,64 @@ class CdcPartitionReader<T : Comparable<T>>(
val debeziumRecordValue: DebeziumRecordValue? =
event.value()?.let { DebeziumRecordValue(Jsons.readTree(it)) }
// Process records, ignoring heartbeats which are only used for completion checks.
val isRecord: Boolean
if (debeziumRecordValue == null) {
isRecord = false
numTombstones.incrementAndGet()
} else if (debeziumRecordValue.isHeartbeat) {
isRecord = false
numHeartbeats.incrementAndGet()
} else {
isRecord = true
val eventType: EventType = run {
if (debeziumRecordValue == null) return@run EventType.TOMBSTONE
if (debeziumRecordValue.isHeartbeat) return@run EventType.HEARTBEAT
val debeziumRecordKey = DebeziumRecordKey(Jsons.readTree(event.key()))
val deserializedRecord: DeserializedRecord =
readerOps.deserialize(debeziumRecordKey, debeziumRecordValue)
val streamRecordConsumer: StreamRecordConsumer? =
?: return@run EventType.RECORD_DISCARDED_BY_DESERIALIZE
val streamRecordConsumer: StreamRecordConsumer =
streamRecordConsumers[deserializedRecord.streamID]
if (streamRecordConsumer == null) {
numDiscardedRecords.incrementAndGet()
} else {
streamRecordConsumer.accept(deserializedRecord.data, deserializedRecord.changes)
numEmittedRecords.incrementAndGet()
}
?: return@run EventType.RECORD_DISCARDED_BY_STREAM_ID
streamRecordConsumer.accept(deserializedRecord.data, deserializedRecord.changes)
return@run EventType.RECORD_EMITTED
}
// Update counters.
when (eventType) {
EventType.TOMBSTONE -> numTombstones
EventType.HEARTBEAT -> numHeartbeats
EventType.RECORD_DISCARDED_BY_DESERIALIZE,
EventType.RECORD_DISCARDED_BY_STREAM_ID -> numDiscardedRecords
EventType.RECORD_EMITTED -> numEmittedRecords
}.incrementAndGet()
// Look for reasons to close down the engine.
val closeReason: CloseReason? = run {
val closeReason: CloseReason = run {
if (input.isSynthetic && eventType != EventType.HEARTBEAT) {
// Special case where the engine started with a synthetic offset:
// don't even consider closing the engine unless handling a heartbeat event.
// For some databases, such as Oracle, Debezium actually needs to snapshot the
// schema in order to collect the database schema history and there's no point
// in interrupting it until the snapshot is done.
return
}
if (!coroutineContext.isActive) {
return@run CloseReason.TIMEOUT
}
val currentPosition: T? = position(sourceRecord) ?: position(debeziumRecordValue)
if (currentPosition == null || currentPosition < upperBound) {
return@run null
return
}
// Close because the current event is past the sync upper bound.
if (isRecord) {
CloseReason.RECORD_REACHED_TARGET_POSITION
} else {
CloseReason.HEARTBEAT_OR_TOMBSTONE_REACHED_TARGET_POSITION
when (eventType) {
EventType.TOMBSTONE,
EventType.HEARTBEAT ->
CloseReason.HEARTBEAT_OR_TOMBSTONE_REACHED_TARGET_POSITION
EventType.RECORD_EMITTED,
EventType.RECORD_DISCARDED_BY_DESERIALIZE,
EventType.RECORD_DISCARDED_BY_STREAM_ID ->
CloseReason.RECORD_REACHED_TARGET_POSITION
}
}
// Idempotent engine shutdown.
if (closeReason != null && closeReasonReference.compareAndSet(null, closeReason)) {
log.info { "Shutting down Debezium engine: ${closeReason.message}." }
// TODO : send close analytics message
Thread({ engine.close() }, "debezium-close").start()
// At this point, if we haven't returned already, we want to close down the engine.
if (!closeReasonReference.compareAndSet(null, closeReason)) {
// An earlier event has already triggered closing down the engine, do nothing.
return
}
// At this point, if we haven't returned already, we need to close down the engine.
log.info { "Shutting down Debezium engine: ${closeReason.message}." }
// TODO : send close analytics message
Thread({ engine.close() }, "debezium-close").start()
}

private fun position(sourceRecord: SourceRecord?): T? {
Expand All @@ -204,6 +226,14 @@ class CdcPartitionReader<T : Comparable<T>>(
}
}

private enum class EventType {
TOMBSTONE,
HEARTBEAT,
RECORD_DISCARDED_BY_DESERIALIZE,
RECORD_DISCARDED_BY_STREAM_ID,
RECORD_EMITTED,
}

inner class CompletionCallback : DebeziumEngine.CompletionCallback {
override fun handle(success: Boolean, message: String?, error: Throwable?) {
if (success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class CdcPartitionsCreator<T : Comparable<T>>(
val feedBootstrap: GlobalFeedBootstrap,
val creatorOps: CdcPartitionsCreatorDebeziumOperations<T>,
val readerOps: CdcPartitionReaderDebeziumOperations<T>,
val lowerBoundReference: AtomicReference<T>,
val upperBoundReference: AtomicReference<T>,
) : PartitionsCreator {
private val log = KotlinLogging.logger {}
Expand Down Expand Up @@ -72,20 +73,30 @@ class CdcPartitionsCreator<T : Comparable<T>>(
upperBound,
input
)
val lowerBound: T = creatorOps.position(input.state.offset)
val lowerBoundInPreviousRound: T? = lowerBoundReference.getAndSet(lowerBound)
if (input.isSynthetic) {
// Handle synthetic offset edge-case, which always needs to run.
// Debezium needs to run to generate the full state, which might include schema history.
log.info { "Current offset is synthetic." }
return listOf(partitionReader)
}
val lowerBound: T = creatorOps.position(input.state.offset)
if (upperBound <= lowerBound) {
// Handle completion due to reaching the WAL position upper bound.
log.info {
"Current position '$lowerBound' equals or exceeds target position '$upperBound'."
}
globalLockResource.markCdcAsComplete()
return listOf()
return emptyList()
}
if (lowerBoundInPreviousRound != null && lowerBound <= lowerBoundInPreviousRound) {
// Handle completion due to stalling.
log.info {
"Current position '$lowerBound' has not increased in the last round, " +
"prior to which is was '$lowerBoundInPreviousRound'."
}
globalLockResource.markCdcAsComplete()
return emptyList()
}
// Handle common case.
log.info { "Current position '$lowerBound' does not exceed target position '$upperBound'." }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ class CdcPartitionsCreatorFactory<T : Comparable<T>>(
val debeziumOps: DebeziumOperations<T>,
) : PartitionsCreatorFactory {

/**
* [AtomicReference] to a WAL position lower bound value shared by all [CdcPartitionsCreator]s.
* This value is updated by the [CdcPartitionsCreator] based on the incumbent state and is used
* to detect stalls.
*/
private val lowerBoundReference = AtomicReference<T>()

/**
* [AtomicReference] to a WAL position upper bound value shared by all [CdcPartitionsCreator]s.
* This value is set exactly once by the first [CdcPartitionsCreator].
Expand All @@ -39,6 +46,7 @@ class CdcPartitionsCreatorFactory<T : Comparable<T>>(
feedBootstrap,
debeziumOps,
debeziumOps,
lowerBoundReference,
upperBoundReference,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ value class DebeziumRecordKey(val wrapped: JsonNode) {
@JvmInline
value class DebeziumRecordValue(val wrapped: JsonNode) {

/** True if this is a Debezium heartbeat event. These aren't passed to [DebeziumConsumer]. */
/**
* True if this is a Debezium heartbeat event, or the equivalent thereof. In any case, such
* events are only used for their position value and for triggering timeouts.
*/
val isHeartbeat: Boolean
get() = source.isNull

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ interface CdcPartitionsCreatorDebeziumOperations<T : Comparable<T>> {

interface CdcPartitionReaderDebeziumOperations<T : Comparable<T>> {

/** Transforms a [DebeziumRecordValue] into a [DeserializedRecord]. */
fun deserialize(key: DebeziumRecordKey, value: DebeziumRecordValue): DeserializedRecord
/**
* Transforms a [DebeziumRecordKey] and a [DebeziumRecordValue] into a [DeserializedRecord].
*
* Returning null means that the event should be treated like a heartbeat.
*/
fun deserialize(key: DebeziumRecordKey, value: DebeziumRecordValue): DeserializedRecord?

/** Maps a [DebeziumState] to an [OpaqueStateValue]. */
fun serialize(debeziumState: DebeziumState): OpaqueStateValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class CdcPartitionsCreatorTest {

val global = Global(listOf(stream))

val lowerBoundReference = AtomicReference<CreatorPosition>(null)
val upperBoundReference = AtomicReference<CreatorPosition>(null)

val creator: CdcPartitionsCreator<CreatorPosition>
Expand All @@ -68,6 +69,7 @@ class CdcPartitionsCreatorTest {
globalFeedBootstrap,
creatorOps,
readerOps,
lowerBoundReference,
upperBoundReference,
)

Expand Down

0 comments on commit 711dfaa

Please sign in to comment.