Skip to content

Commit

Permalink
Feature/meterd thread pool (#199)
Browse files Browse the repository at this point in the history
* Metered thread pools

Co-authored-by: Patryk Gutenplan <[email protected]>
Co-authored-by: Patryk Żmigrodzki <[email protected]>
  • Loading branch information
3 people authored Oct 30, 2020
1 parent 8c87aa5 commit 7aa1030
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.envoyproxy.controlplane.server.callback.SnapshotCollectingCallback
import io.grpc.Server
import io.grpc.netty.NettyServerBuilder
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics
import io.netty.channel.nio.NioEventLoopGroup
import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
import pl.allegro.tech.servicemesh.envoycontrol.groups.GroupChangeWatcher
Expand Down Expand Up @@ -38,7 +39,9 @@ import reactor.core.Disposable
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
import java.time.Clock
import java.util.concurrent.BlockingQueue
import java.util.concurrent.Executor
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadFactory
Expand Down Expand Up @@ -97,21 +100,19 @@ class ControlPlane private constructor(

fun build(changes: Flux<MultiClusterState>): ControlPlane {
if (grpcServerExecutor == null) {
grpcServerExecutor = ThreadPoolExecutor(
grpcServerExecutor = newMeteredThreadPoolExecutor(
properties.server.serverPoolSize,
properties.server.serverPoolSize,
properties.server.serverPoolKeepAlive.toMillis(), TimeUnit.MILLISECONDS,
properties.server.serverPoolKeepAlive.toMillis(),
LinkedBlockingQueue<Runnable>(),
ThreadNamingThreadFactory("grpc-server-worker")
"grpc-server-worker"
)
}

if (nioEventLoopExecutor == null) {
// unbounded executor - netty will only use configured number of threads
// (by nioEventLoopThreadCount property or default netty value: <number of CPUs> * 2)
nioEventLoopExecutor = Executors.newCachedThreadPool(
ThreadNamingThreadFactory("grpc-worker-event-loop")
)
nioEventLoopExecutor = newMeteredCachedThreadPool("grpc-worker-event-loop")
}

if (executorGroup == null) {
Expand All @@ -122,19 +123,19 @@ class ControlPlane private constructor(
// executor group is invalid, because it may lead to sending XDS responses out of order for
// given DiscoveryRequestStreamObserver. We should switch to multiple, single-threaded
// ThreadPoolExecutors. More info in linked task.
val executor = Executors.newFixedThreadPool(
properties.server.executorGroup.parallelPoolSize,
ThreadNamingThreadFactory("discovery-responses-executor")
val executor = newMeteredFixedThreadPool(
"discovery-responses-executor",
properties.server.executorGroup.parallelPoolSize
)
ExecutorGroup { executor }
}
}
}

if (globalSnapshotExecutor == null) {
globalSnapshotExecutor = Executors.newFixedThreadPool(
properties.server.globalSnapshotUpdatePoolSize,
ThreadNamingThreadFactory("snapshot-update")
globalSnapshotExecutor = newMeteredFixedThreadPool(
"snapshot-update",
properties.server.globalSnapshotUpdatePoolSize
)
}

Expand All @@ -145,9 +146,9 @@ class ControlPlane private constructor(
ExecutorType.PARALLEL -> ParallelScheduler(
scheduler = Schedulers.fromExecutor(
groupSnapshotParallelExecutorSupplier()
?: Executors.newFixedThreadPool(
groupSnapshotProperties.parallelPoolSize,
ThreadNamingThreadFactory("group-snapshot")
?: newMeteredFixedThreadPool(
"group-snapshot",
groupSnapshotProperties.parallelPoolSize
)
),
parallelism = groupSnapshotProperties.parallelPoolSize
Expand Down Expand Up @@ -280,6 +281,42 @@ class ControlPlane private constructor(
override fun newThread(r: Runnable) = Thread(r, "$threadNamePrefix-${counter.getAndIncrement()}")
}

private fun newMeteredThreadPoolExecutor(
corePoolSize: Int,
maximumPoolSize: Int,
keepAliveTimeMillis: Long,
workQueue: BlockingQueue<Runnable>,
poolExecutorName: String
): ThreadPoolExecutor {
val threadPoolExecutor = ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTimeMillis,
TimeUnit.MILLISECONDS,
workQueue,
ThreadNamingThreadFactory(poolExecutorName)
)
meterExecutor(threadPoolExecutor, poolExecutorName)
return threadPoolExecutor
}

private fun newMeteredFixedThreadPool(executorServiceName: String, poolSize: Int): ExecutorService {
val executor = Executors.newFixedThreadPool(poolSize, ThreadNamingThreadFactory(executorServiceName))
meterExecutor(executor, executorServiceName)
return executor
}

private fun newMeteredCachedThreadPool(executorServiceName: String): ExecutorService {
val executor = Executors.newCachedThreadPool(ThreadNamingThreadFactory(executorServiceName))
meterExecutor(executor, executorServiceName)
return executor
}

private fun meterExecutor(executor: ExecutorService, executorServiceName: String) {
ExecutorServiceMetrics(executor, executorServiceName, executorServiceName, emptySet())
.bindTo(meterRegistry)
}

private fun grpcServer(
config: ServerProperties,
discoveryServer: DiscoveryServer,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package pl.allegro.tech.servicemesh.envoycontrol.metrics

import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import pl.allegro.tech.servicemesh.envoycontrol.ControlPlane
import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlProperties
import pl.allegro.tech.servicemesh.envoycontrol.server.ExecutorType
import reactor.core.publisher.Flux

class ThreadPoolMetricTest {

@Test
fun `should bind metrics for default executors`() {
// given
val meterRegistry = SimpleMeterRegistry()
val envoyControlProperties = EnvoyControlProperties().also {
it.server.executorGroup.type = ExecutorType.PARALLEL
it.server.groupSnapshotUpdateScheduler.type = ExecutorType.PARALLEL
}

val controlPlane = ControlPlane.builder(envoyControlProperties, meterRegistry).build(Flux.empty())

// when
controlPlane.start()

// then
val allMeterNames = meterRegistry.meters.map { it.id.name }
val requiredMeterNames = listOf("grpc-server-worker", "grpc-worker-event-loop", "snapshot-update", "group-snapshot").flatMap {
listOf("$it.executor.completed", "$it.executor.active", "$it.executor.queued", "$it.executor.pool.size")
}

assertThat(allMeterNames).containsAll(requiredMeterNames)

// and
controlPlane.close()
}
}

0 comments on commit 7aa1030

Please sign in to comment.