From 7aa10308bb568f1ecf340337f75ccd54df981b4f Mon Sep 17 00:00:00 2001 From: Patryk Gutenplan Date: Fri, 30 Oct 2020 10:09:50 +0100 Subject: [PATCH] Feature/meterd thread pool (#199) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Metered thread pools Co-authored-by: Patryk Gutenplan Co-authored-by: Patryk Żmigrodzki --- .../servicemesh/envoycontrol/ControlPlane.kt | 67 ++++++++++++++----- .../metrics/ThreadPoolMetricTest.kt | 38 +++++++++++ 2 files changed, 90 insertions(+), 15 deletions(-) create mode 100644 envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/metrics/ThreadPoolMetricTest.kt diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt index dc1033e50..4502d10d1 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt @@ -9,6 +9,7 @@ import io.envoyproxy.controlplane.server.callback.SnapshotCollectingCallback import io.grpc.Server import io.grpc.netty.NettyServerBuilder import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics import io.netty.channel.nio.NioEventLoopGroup import pl.allegro.tech.servicemesh.envoycontrol.groups.Group import pl.allegro.tech.servicemesh.envoycontrol.groups.GroupChangeWatcher @@ -38,7 +39,9 @@ import reactor.core.Disposable import reactor.core.publisher.Flux import reactor.core.scheduler.Schedulers import java.time.Clock +import java.util.concurrent.BlockingQueue import java.util.concurrent.Executor +import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ThreadFactory @@ -97,21 +100,19 @@ class ControlPlane private constructor( fun build(changes: Flux): ControlPlane { if (grpcServerExecutor == null) { - grpcServerExecutor = ThreadPoolExecutor( + grpcServerExecutor = newMeteredThreadPoolExecutor( properties.server.serverPoolSize, properties.server.serverPoolSize, - properties.server.serverPoolKeepAlive.toMillis(), TimeUnit.MILLISECONDS, + properties.server.serverPoolKeepAlive.toMillis(), LinkedBlockingQueue(), - ThreadNamingThreadFactory("grpc-server-worker") + "grpc-server-worker" ) } if (nioEventLoopExecutor == null) { // unbounded executor - netty will only use configured number of threads // (by nioEventLoopThreadCount property or default netty value: * 2) - nioEventLoopExecutor = Executors.newCachedThreadPool( - ThreadNamingThreadFactory("grpc-worker-event-loop") - ) + nioEventLoopExecutor = newMeteredCachedThreadPool("grpc-worker-event-loop") } if (executorGroup == null) { @@ -122,9 +123,9 @@ class ControlPlane private constructor( // executor group is invalid, because it may lead to sending XDS responses out of order for // given DiscoveryRequestStreamObserver. We should switch to multiple, single-threaded // ThreadPoolExecutors. More info in linked task. - val executor = Executors.newFixedThreadPool( - properties.server.executorGroup.parallelPoolSize, - ThreadNamingThreadFactory("discovery-responses-executor") + val executor = newMeteredFixedThreadPool( + "discovery-responses-executor", + properties.server.executorGroup.parallelPoolSize ) ExecutorGroup { executor } } @@ -132,9 +133,9 @@ class ControlPlane private constructor( } if (globalSnapshotExecutor == null) { - globalSnapshotExecutor = Executors.newFixedThreadPool( - properties.server.globalSnapshotUpdatePoolSize, - ThreadNamingThreadFactory("snapshot-update") + globalSnapshotExecutor = newMeteredFixedThreadPool( + "snapshot-update", + properties.server.globalSnapshotUpdatePoolSize ) } @@ -145,9 +146,9 @@ class ControlPlane private constructor( ExecutorType.PARALLEL -> ParallelScheduler( scheduler = Schedulers.fromExecutor( groupSnapshotParallelExecutorSupplier() - ?: Executors.newFixedThreadPool( - groupSnapshotProperties.parallelPoolSize, - ThreadNamingThreadFactory("group-snapshot") + ?: newMeteredFixedThreadPool( + "group-snapshot", + groupSnapshotProperties.parallelPoolSize ) ), parallelism = groupSnapshotProperties.parallelPoolSize @@ -280,6 +281,42 @@ class ControlPlane private constructor( override fun newThread(r: Runnable) = Thread(r, "$threadNamePrefix-${counter.getAndIncrement()}") } + private fun newMeteredThreadPoolExecutor( + corePoolSize: Int, + maximumPoolSize: Int, + keepAliveTimeMillis: Long, + workQueue: BlockingQueue, + poolExecutorName: String + ): ThreadPoolExecutor { + val threadPoolExecutor = ThreadPoolExecutor( + corePoolSize, + maximumPoolSize, + keepAliveTimeMillis, + TimeUnit.MILLISECONDS, + workQueue, + ThreadNamingThreadFactory(poolExecutorName) + ) + meterExecutor(threadPoolExecutor, poolExecutorName) + return threadPoolExecutor + } + + private fun newMeteredFixedThreadPool(executorServiceName: String, poolSize: Int): ExecutorService { + val executor = Executors.newFixedThreadPool(poolSize, ThreadNamingThreadFactory(executorServiceName)) + meterExecutor(executor, executorServiceName) + return executor + } + + private fun newMeteredCachedThreadPool(executorServiceName: String): ExecutorService { + val executor = Executors.newCachedThreadPool(ThreadNamingThreadFactory(executorServiceName)) + meterExecutor(executor, executorServiceName) + return executor + } + + private fun meterExecutor(executor: ExecutorService, executorServiceName: String) { + ExecutorServiceMetrics(executor, executorServiceName, executorServiceName, emptySet()) + .bindTo(meterRegistry) + } + private fun grpcServer( config: ServerProperties, discoveryServer: DiscoveryServer, diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/metrics/ThreadPoolMetricTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/metrics/ThreadPoolMetricTest.kt new file mode 100644 index 000000000..2a91824e1 --- /dev/null +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/metrics/ThreadPoolMetricTest.kt @@ -0,0 +1,38 @@ +package pl.allegro.tech.servicemesh.envoycontrol.metrics + +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import pl.allegro.tech.servicemesh.envoycontrol.ControlPlane +import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlProperties +import pl.allegro.tech.servicemesh.envoycontrol.server.ExecutorType +import reactor.core.publisher.Flux + +class ThreadPoolMetricTest { + + @Test + fun `should bind metrics for default executors`() { + // given + val meterRegistry = SimpleMeterRegistry() + val envoyControlProperties = EnvoyControlProperties().also { + it.server.executorGroup.type = ExecutorType.PARALLEL + it.server.groupSnapshotUpdateScheduler.type = ExecutorType.PARALLEL + } + + val controlPlane = ControlPlane.builder(envoyControlProperties, meterRegistry).build(Flux.empty()) + + // when + controlPlane.start() + + // then + val allMeterNames = meterRegistry.meters.map { it.id.name } + val requiredMeterNames = listOf("grpc-server-worker", "grpc-worker-event-loop", "snapshot-update", "group-snapshot").flatMap { + listOf("$it.executor.completed", "$it.executor.active", "$it.executor.queued", "$it.executor.pool.size") + } + + assertThat(allMeterNames).containsAll(requiredMeterNames) + + // and + controlPlane.close() + } +}