Skip to content

Commit

Permalink
Dont run snapshot feeds in case cdc stopped for stale offset (#46876)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich authored Oct 15, 2024
1 parent dfb6d8e commit 5e5a349
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.read

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.cdk.util.ThreadRenamingCoroutineName
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.CoroutineContext
import kotlin.time.toKotlinDuration
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.update
Expand Down Expand Up @@ -66,22 +69,32 @@ class RootReader(
}
// Call listener hook.
listener(feedJobs)
// Join on all stream feeds and collect caught exceptions.
val streamExceptions: Map<Stream, Throwable?> =
feeds.filterIsInstance<Stream>().associateWith {
feedJobs[it]?.join()
exceptions[it]
}
// Join on all global feeds and collect caught exceptions.
val globalExceptions: Map<Global, Throwable?> =
feeds.filterIsInstance<Global>().associateWith {
feedJobs[it]?.join()
exceptions[it]
}

// Certain errors on the global feed cause a full stop to all stream reads
if (globalExceptions.values.filterIsInstance<ConfigErrorException>().isNotEmpty()) {
this@supervisorScope.cancel()
}

// Join on all stream feeds and collect caught exceptions.
val streamExceptions: Map<Stream, Throwable?> =
feeds.filterIsInstance<Stream>().associateWith {
try {
feedJobs[it]?.join()
exceptions[it]
} catch (_: CancellationException) {
null
}
}
// Reduce and throw any caught exceptions.
val caughtExceptions: List<Throwable> =
streamExceptions.values.mapNotNull { it } +
globalExceptions.values.mapNotNull { it }
globalExceptions.values.mapNotNull { it } +
streamExceptions.values.mapNotNull { it }
if (caughtExceptions.isNotEmpty()) {
val cause: Throwable =
caughtExceptions.reduce { acc: Throwable, exception: Throwable ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package io.airbyte.cdk.read
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ArrayNode
import io.airbyte.cdk.ClockFactory
import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.output.BufferingOutputConsumer
Expand Down Expand Up @@ -194,6 +195,51 @@ class RootReaderIntegrationTest {
Assertions.assertFalse(globalStateMessages.isEmpty())
}

@Test
fun testAllStreamsGlobalConfigError() {
val stateManager =
StateManager(
global = Global(testCases.map { it.stream }),
initialGlobalState = null,
initialStreamStates = testCases.associate { it.stream to null },
)
val testOutputConsumer = BufferingOutputConsumer(ClockFactory().fixed())
val rootReader =
RootReader(
stateManager,
slowHeartbeat,
excessiveTimeout,
testOutputConsumer,
listOf(
ConfigErrorThrowingGlobalPartitionsCreatorFactory(
Semaphore(CONSTRAINED),
*testCases.toTypedArray()
)
),
)
Assertions.assertThrows(ConfigErrorException::class.java) {
runBlocking(Dispatchers.Default) { rootReader.read() }
}
val log = KotlinLogging.logger {}
for (msg in testOutputConsumer.messages()) {
log.info { Jsons.writeValueAsString(msg) }
}
for (testCase in testCases) {
log.info { "checking stream feed for ${testCase.name}" }
val streamStateMessages: List<AirbyteStateMessage> =
testOutputConsumer.states().filter {
it.stream?.streamDescriptor?.name == testCase.name
}
Assertions.assertTrue(streamStateMessages.isEmpty())
}
log.info { "checking global feed" }
val globalStateMessages: List<AirbyteStateMessage> =
testOutputConsumer.states().filter {
it.type == AirbyteStateMessage.AirbyteStateType.GLOBAL
}
Assertions.assertTrue(globalStateMessages.isEmpty())
}

companion object {
const val CONSTRAINED = 2
}
Expand Down Expand Up @@ -571,7 +617,7 @@ class TestPartitionReader(
)
}

class TestPartitionsCreatorFactory(
open class TestPartitionsCreatorFactory(
val resource: Semaphore,
vararg val testCases: TestCase,
) : PartitionsCreatorFactory {
Expand All @@ -582,18 +628,7 @@ class TestPartitionsCreatorFactory(
feed: Feed,
): PartitionsCreator {
if (feed is Global) {
return object : PartitionsCreator {
override fun tryAcquireResources(): PartitionsCreator.TryAcquireResourcesStatus {
return PartitionsCreator.TryAcquireResourcesStatus.READY_TO_RUN
}

override suspend fun run(): List<PartitionReader> {
// Do nothing.
return emptyList()
}

override fun releaseResources() {}
}
return makeGlobalPartitionsCreator()
}
// For a stream feed, pick the CreatorCase in the corresponding TestCase
// which is the successor of the one whose corresponding state is in the StateQuerier.
Expand All @@ -613,6 +648,40 @@ class TestPartitionsCreatorFactory(
resource,
)
}

protected open fun makeGlobalPartitionsCreator(): PartitionsCreator {
return object : PartitionsCreator {
override fun tryAcquireResources(): PartitionsCreator.TryAcquireResourcesStatus {
return PartitionsCreator.TryAcquireResourcesStatus.READY_TO_RUN
}

override suspend fun run(): List<PartitionReader> {
// Do nothing.
return emptyList()
}

override fun releaseResources() {}
}
}
}

class ConfigErrorThrowingGlobalPartitionsCreatorFactory(
resource: Semaphore,
vararg testCases: TestCase,
) : TestPartitionsCreatorFactory(resource, *testCases) {
override fun makeGlobalPartitionsCreator(): PartitionsCreator {
return object : PartitionsCreator {
override fun tryAcquireResources(): PartitionsCreator.TryAcquireResourcesStatus {
return PartitionsCreator.TryAcquireResourcesStatus.READY_TO_RUN
}

override suspend fun run(): List<PartitionReader> {
throw ConfigErrorException("some config error")
}

override fun releaseResources() {}
}
}
}

/** Tests should succeed and not timeout. */
Expand Down

0 comments on commit 5e5a349

Please sign in to comment.