Skip to content

Commit

Permalink
Bulk Load CDK Stream Incomplete Prep Refactor: Memory manager provide… (
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Oct 5, 2024
1 parent 3f9c032 commit fe77c7d
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlinx.coroutines.runBlocking

/**
Expand Down Expand Up @@ -60,7 +59,8 @@ class DestinationMessageQueue(
ConcurrentHashMap()

private val totalQueueSizeBytes = AtomicLong(0L)
private val maxQueueSizeBytes: Long
private val reservedMemory: MemoryManager.Reservation
private val reservedMemoryManager: MemoryManager
private val memoryLock = ReentrantLock()
private val memoryLockCondition = memoryLock.newCondition()

Expand All @@ -69,23 +69,16 @@ class DestinationMessageQueue(
val adjustedRatio =
config.maxMessageQueueMemoryUsageRatio /
(1.0 + config.estimatedRecordMemoryOverheadRatio)
maxQueueSizeBytes = runBlocking { memoryManager.reserveRatio(adjustedRatio) }
reservedMemory = runBlocking { memoryManager.reserveRatio(adjustedRatio) }
reservedMemoryManager = reservedMemory.getReservationManager()
}

override suspend fun acquireQueueBytesBlocking(bytes: Long) {
memoryLock.withLock {
while (totalQueueSizeBytes.get() + bytes > maxQueueSizeBytes) {
memoryLockCondition.await()
}
totalQueueSizeBytes.addAndGet(bytes)
}
reservedMemoryManager.reserveBlocking(bytes)
}

override suspend fun releaseQueueBytes(bytes: Long) {
memoryLock.withLock {
totalQueueSizeBytes.addAndGet(-bytes)
memoryLockCondition.signalAll()
}
reservedMemoryManager.release(bytes)
}

override suspend fun getChannel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

package io.airbyte.cdk.state

import io.airbyte.cdk.util.CloseableCoroutine
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.sync.Mutex
Expand All @@ -19,17 +21,47 @@ import kotlinx.coroutines.sync.withLock
* TODO: Some degree of logging/monitoring around how accurate we're actually being?
*/
@Singleton
class MemoryManager(private val availableMemoryProvider: AvailableMemoryProvider) {
private val totalMemoryBytes: Long = availableMemoryProvider.availableMemoryBytes
class MemoryManager(availableMemoryProvider: AvailableMemoryProvider) {
// This is slightly awkward, but Micronaut only injects the primary constructor
constructor(
availableMemory: Long
) : this(
object : AvailableMemoryProvider {
override val availableMemoryBytes: Long = availableMemory
}
)

private val totalMemoryBytes = availableMemoryProvider.availableMemoryBytes
private var usedMemoryBytes = AtomicLong(0L)
private val mutex = Mutex()
private val syncChannel = Channel<Unit>(Channel.UNLIMITED)

/**
* Releasable reservation of memory. For large blocks (ie, from [reserveRatio], provides a
* submanager that can be used to manage allocating the reservation).
*/
inner class Reservation(val bytes: Long) : CloseableCoroutine {
private var released = AtomicBoolean(false)

suspend fun release() {
if (!released.compareAndSet(false, true)) {
return
}
release(bytes)
}

fun getReservationManager(): MemoryManager = MemoryManager(bytes)

override suspend fun close() {
release()
}
}

val remainingMemoryBytes: Long
get() = totalMemoryBytes - usedMemoryBytes.get()

/* Attempt to reserve memory. If enough memory is not available, waits until it is, then reserves. */
suspend fun reserveBlocking(memoryBytes: Long) {
suspend fun reserveBlocking(memoryBytes: Long): Reservation {
if (memoryBytes > totalMemoryBytes) {
throw IllegalArgumentException(
"Requested ${memoryBytes}b memory exceeds ${totalMemoryBytes}b total"
Expand All @@ -41,13 +73,15 @@ class MemoryManager(private val availableMemoryProvider: AvailableMemoryProvider
syncChannel.receive()
}
usedMemoryBytes.addAndGet(memoryBytes)

return Reservation(memoryBytes)
}
}

suspend fun reserveRatio(ratio: Double): Long {
suspend fun reserveRatio(ratio: Double): Reservation {
val estimatedSize = (totalMemoryBytes.toDouble() * ratio).toLong()
reserveBlocking(estimatedSize)
return estimatedSize
return Reservation(estimatedSize)
}

suspend fun release(memoryBytes: Long) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,15 @@ fun <T> Flow<T>.takeUntilInclusive(predicate: (T) -> Boolean): Flow<T> = transfo
emit(value)
!predicate(value)
}

interface CloseableCoroutine {
suspend fun close()
}

suspend fun <T : CloseableCoroutine> T.use(block: suspend (T) -> Unit) {
try {
block(this)
} finally {
close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import kotlinx.coroutines.withTimeout
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

@MicronautTest
@MicronautTest(environments = ["MemoryManagerTest"])
class MemoryManagerTest {
@Singleton
@Replaces(MemoryManager::class)
@Requires(env = ["test"])
@Requires(env = ["MemoryManagerTest"])
class MockAvailableMemoryProvider : AvailableMemoryProvider {
override val availableMemoryBytes: Long = 1000
}
Expand Down
3 changes: 3 additions & 0 deletions spotbugs-exclude-filter-file.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@
<Class name="io.airbyte.cdk.integrations.debezium.internals.AirbyteFileOffsetBackingStore" />
<Bug code="SECOBDES" />
</Match>
<Match>
<Package name="io.airbyte.cdk.util.*" />
</Match>
</FindBugsFilter>

0 comments on commit fe77c7d

Please sign in to comment.