Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InstrumentedExecutorService registered metrics cleanup after shutdown #3202

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
8969e48
InstrumentedExecutorService registered metrics cleanup on shutdown
the-thing Feb 22, 2023
af9cf23
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
the-thing Feb 26, 2023
988091c
InstrumentedExecutorService registered metrics cleanup on shutdown
the-thing Feb 26, 2023
7f92939
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
the-thing Mar 7, 2023
14ed511
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
the-thing Mar 31, 2023
8aa23ca
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
the-thing Apr 30, 2023
1426825
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
the-thing May 14, 2023
fb645d4
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
the-thing Jun 8, 2023
10e33a6
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
the-thing Aug 2, 2023
3ea266e
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
the-thing Sep 8, 2023
177d6d2
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
joschi Sep 22, 2023
4f4461e
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
the-thing Nov 2, 2023
5e59a35
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
the-thing Jan 10, 2024
44357c1
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
the-thing Apr 22, 2024
f5389a5
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
the-thing Jun 13, 2024
4f68a6e
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
the-thing Jun 21, 2024
00c3028
Merge branch 'release/4.2.x' into 2920/remove_gauges_on_shutdown
the-thing Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class InstrumentedExecutorService implements ExecutorService {
private static final AtomicLong NAME_COUNTER = new AtomicLong();

private final ExecutorService delegate;
private final MetricRegistry registry;
private final String name;
private final Meter submitted;
private final Counter running;
private final Meter completed;
Expand All @@ -50,12 +52,18 @@ public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry regi
*/
public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry, String name) {
this.delegate = delegate;
this.registry = registry;
this.name = name;
this.submitted = registry.meter(MetricRegistry.name(name, "submitted"));
this.running = registry.counter(MetricRegistry.name(name, "running"));
this.completed = registry.meter(MetricRegistry.name(name, "completed"));
this.idle = registry.timer(MetricRegistry.name(name, "idle"));
this.duration = registry.timer(MetricRegistry.name(name, "duration"));

registerInternalMetrics();
}

private void registerInternalMetrics() {
if (delegate instanceof ThreadPoolExecutor) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) delegate;
registry.registerGauge(MetricRegistry.name(name, "pool.size"),
Expand Down Expand Up @@ -86,6 +94,23 @@ public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry regi
}
}

private void removeInternalMetrics() {
if (delegate instanceof ThreadPoolExecutor) {
registry.remove(MetricRegistry.name(name, "pool.size"));
registry.remove(MetricRegistry.name(name, "pool.core"));
registry.remove(MetricRegistry.name(name, "pool.max"));
registry.remove(MetricRegistry.name(name, "tasks.active"));
registry.remove(MetricRegistry.name(name, "tasks.completed"));
registry.remove(MetricRegistry.name(name, "tasks.queued"));
registry.remove(MetricRegistry.name(name, "tasks.capacity"));
} else if (delegate instanceof ForkJoinPool) {
registry.remove(MetricRegistry.name(name, "tasks.stolen"));
registry.remove(MetricRegistry.name(name, "tasks.queued"));
registry.remove(MetricRegistry.name(name, "threads.active"));
registry.remove(MetricRegistry.name(name, "threads.running"));
}
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -173,11 +198,14 @@ private <T> Collection<? extends Callable<T>> instrument(Collection<? extends Ca
@Override
public void shutdown() {
delegate.shutdown();
removeInternalMetrics();
}

@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
List<Runnable> remainingTasks = delegate.shutdownNow();
removeInternalMetrics();
return remainingTasks;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -167,9 +166,22 @@ public void reportsTasksInformationForThreadPoolExecutor() throws Exception {
assertThat(poolSize.getValue()).isEqualTo(1);
}

@Test
public void removesMetricsAfterShutdownForThreadPoolExecutor() {
executor = new ThreadPoolExecutor(4, 16,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(32));
instrumentedExecutorService = new InstrumentedExecutorService(executor, registry, "stp");

assertThat(registry.getMetrics()).containsKeys("stp.pool.size", "stp.pool.core", "stp.pool.max", "stp.tasks.active", "stp.tasks.completed", "stp.tasks.queued", "stp.tasks.capacity");

instrumentedExecutorService.shutdown();

assertThat(registry.getMetrics()).doesNotContainKeys("stp.pool.size", "stp.pool.core", "stp.pool.max", "stp.tasks.active", "stp.tasks.completed", "stp.tasks.queued", "stp.tasks.capacity");
}

@Test
@SuppressWarnings("unchecked")
public void reportsTasksInformationForForkJoinPool() throws Exception {
public void reportsTasksInformationForForkJoinPool() {
executor = Executors.newWorkStealingPool(4);
instrumentedExecutorService = new InstrumentedExecutorService(executor, registry, "fjp");
submitted = registry.meter("fjp.submitted");
Expand Down Expand Up @@ -215,4 +227,16 @@ public void reportsTasksInformationForForkJoinPool() throws Exception {
assertThat(idle.getCount()).isEqualTo(1);
assertThat(idle.getSnapshot().size()).isEqualTo(1);
}

@Test
public void removesMetricsAfterShutdownForForkJoinPool() {
executor = Executors.newWorkStealingPool(4);
instrumentedExecutorService = new InstrumentedExecutorService(executor, registry, "sfjp");

assertThat(registry.getMetrics()).containsKeys("sfjp.tasks.stolen", "sfjp.tasks.queued", "sfjp.threads.active", "sfjp.threads.running");

instrumentedExecutorService.shutdown();

assertThat(registry.getMetrics()).doesNotContainKeys("sfjp.tasks.stolen", "sfjp.tasks.queued", "sfjp.threads.active", "sfjp.threads.running");
}
}