From fe77c7d7285d4b0820740289f32f73056367fa1a Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Sat, 5 Oct 2024 13:39:18 -0700 Subject: [PATCH] =?UTF-8?q?Bulk=20Load=20CDK=20Stream=20Incomplete=20Prep?= =?UTF-8?q?=20Refactor:=20Memory=20manager=20provide=E2=80=A6=20(#46386)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cdk/message/DestinationMessageQueue.kt | 19 +++----- .../io/airbyte/cdk/state/MemoryManager.kt | 44 ++++++++++++++++--- .../io/airbyte/cdk/util/CoroutineUtils.kt | 12 +++++ .../io/airbyte/cdk/state/MemoryManagerTest.kt | 4 +- spotbugs-exclude-filter-file.xml | 3 ++ 5 files changed, 62 insertions(+), 20 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessageQueue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessageQueue.kt index 694d1d29e1bc..1ee527b25795 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessageQueue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessageQueue.kt @@ -13,7 +13,6 @@ import jakarta.inject.Singleton import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock import kotlinx.coroutines.runBlocking /** @@ -60,7 +59,8 @@ class DestinationMessageQueue( ConcurrentHashMap() private val totalQueueSizeBytes = AtomicLong(0L) - private val maxQueueSizeBytes: Long + private val reservedMemory: MemoryManager.Reservation + private val reservedMemoryManager: MemoryManager private val memoryLock = ReentrantLock() private val memoryLockCondition = memoryLock.newCondition() @@ -69,23 +69,16 @@ class DestinationMessageQueue( val adjustedRatio = config.maxMessageQueueMemoryUsageRatio / (1.0 + config.estimatedRecordMemoryOverheadRatio) - maxQueueSizeBytes = runBlocking { memoryManager.reserveRatio(adjustedRatio) } + reservedMemory = runBlocking { memoryManager.reserveRatio(adjustedRatio) } + reservedMemoryManager = reservedMemory.getReservationManager() } override suspend fun acquireQueueBytesBlocking(bytes: Long) { - memoryLock.withLock { - while (totalQueueSizeBytes.get() + bytes > maxQueueSizeBytes) { - memoryLockCondition.await() - } - totalQueueSizeBytes.addAndGet(bytes) - } + reservedMemoryManager.reserveBlocking(bytes) } override suspend fun releaseQueueBytes(bytes: Long) { - memoryLock.withLock { - totalQueueSizeBytes.addAndGet(-bytes) - memoryLockCondition.signalAll() - } + reservedMemoryManager.release(bytes) } override suspend fun getChannel( diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/MemoryManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/MemoryManager.kt index f5511f58fcc1..1babd2c449b3 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/MemoryManager.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/MemoryManager.kt @@ -4,8 +4,10 @@ package io.airbyte.cdk.state +import io.airbyte.cdk.util.CloseableCoroutine import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicLong import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.sync.Mutex @@ -19,17 +21,47 @@ import kotlinx.coroutines.sync.withLock * TODO: Some degree of logging/monitoring around how accurate we're actually being? */ @Singleton -class MemoryManager(private val availableMemoryProvider: AvailableMemoryProvider) { - private val totalMemoryBytes: Long = availableMemoryProvider.availableMemoryBytes +class MemoryManager(availableMemoryProvider: AvailableMemoryProvider) { + // This is slightly awkward, but Micronaut only injects the primary constructor + constructor( + availableMemory: Long + ) : this( + object : AvailableMemoryProvider { + override val availableMemoryBytes: Long = availableMemory + } + ) + + private val totalMemoryBytes = availableMemoryProvider.availableMemoryBytes private var usedMemoryBytes = AtomicLong(0L) private val mutex = Mutex() private val syncChannel = Channel(Channel.UNLIMITED) + /** + * Releasable reservation of memory. For large blocks (ie, from [reserveRatio], provides a + * submanager that can be used to manage allocating the reservation). + */ + inner class Reservation(val bytes: Long) : CloseableCoroutine { + private var released = AtomicBoolean(false) + + suspend fun release() { + if (!released.compareAndSet(false, true)) { + return + } + release(bytes) + } + + fun getReservationManager(): MemoryManager = MemoryManager(bytes) + + override suspend fun close() { + release() + } + } + val remainingMemoryBytes: Long get() = totalMemoryBytes - usedMemoryBytes.get() /* Attempt to reserve memory. If enough memory is not available, waits until it is, then reserves. */ - suspend fun reserveBlocking(memoryBytes: Long) { + suspend fun reserveBlocking(memoryBytes: Long): Reservation { if (memoryBytes > totalMemoryBytes) { throw IllegalArgumentException( "Requested ${memoryBytes}b memory exceeds ${totalMemoryBytes}b total" @@ -41,13 +73,15 @@ class MemoryManager(private val availableMemoryProvider: AvailableMemoryProvider syncChannel.receive() } usedMemoryBytes.addAndGet(memoryBytes) + + return Reservation(memoryBytes) } } - suspend fun reserveRatio(ratio: Double): Long { + suspend fun reserveRatio(ratio: Double): Reservation { val estimatedSize = (totalMemoryBytes.toDouble() * ratio).toLong() reserveBlocking(estimatedSize) - return estimatedSize + return Reservation(estimatedSize) } suspend fun release(memoryBytes: Long) { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/util/CoroutineUtils.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/util/CoroutineUtils.kt index fc497653f36d..522efcf9eed2 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/util/CoroutineUtils.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/util/CoroutineUtils.kt @@ -11,3 +11,15 @@ fun Flow.takeUntilInclusive(predicate: (T) -> Boolean): Flow = transfo emit(value) !predicate(value) } + +interface CloseableCoroutine { + suspend fun close() +} + +suspend fun T.use(block: suspend (T) -> Unit) { + try { + block(this) + } finally { + close() + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/MemoryManagerTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/MemoryManagerTest.kt index 5bc28a27cda1..556301830abd 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/MemoryManagerTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/MemoryManagerTest.kt @@ -17,11 +17,11 @@ import kotlinx.coroutines.withTimeout import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test -@MicronautTest +@MicronautTest(environments = ["MemoryManagerTest"]) class MemoryManagerTest { @Singleton @Replaces(MemoryManager::class) - @Requires(env = ["test"]) + @Requires(env = ["MemoryManagerTest"]) class MockAvailableMemoryProvider : AvailableMemoryProvider { override val availableMemoryBytes: Long = 1000 } diff --git a/spotbugs-exclude-filter-file.xml b/spotbugs-exclude-filter-file.xml index e1b34ccfb57e..6b825caa6c30 100644 --- a/spotbugs-exclude-filter-file.xml +++ b/spotbugs-exclude-filter-file.xml @@ -19,4 +19,7 @@ + + +