Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature request: make executor rescheduling configurable #4188

Open
wants to merge 2 commits into
base: release/4.2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private CollectdReporter(MetricRegistry registry,
String username, String password,
SecurityLevel securityLevel, Sanitize sanitize) {
super(registry, REPORTER_NAME, filter, rateUnit, durationUnit, executor, shutdownExecutorOnStop,
disabledMetricAttributes);
disabledMetricAttributes, null);
this.hostName = (hostname != null) ? hostname : resolveHostName();
this.sender = sender;
this.clock = clock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private ConsoleReporter(MetricRegistry registry,
ScheduledExecutorService executor,
boolean shutdownExecutorOnStop,
Set<MetricAttribute> disabledMetricAttributes) {
super(registry, "console-reporter", filter, rateUnit, durationUnit, executor, shutdownExecutorOnStop, disabledMetricAttributes);
super(registry, "console-reporter", filter, rateUnit, durationUnit, executor, shutdownExecutorOnStop, disabledMetricAttributes, null);
this.output = output;
this.locale = locale;
this.clock = clock;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.codahale.metrics;


import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

@FunctionalInterface
public interface GetScheduledFuture<T> {
ScheduledFuture<T> schedule(Long delay, Long period, TimeUnit unit, Runnable runnable, ScheduledExecutorService executor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public Thread newThread(Runnable r) {
private final long rateFactor;
private final String rateUnit;

private final GetScheduledFuture getScheduledFuture;

/**
* Creates a new {@link ScheduledReporter} instance.
*
Expand Down Expand Up @@ -120,7 +122,7 @@ protected ScheduledReporter(MetricRegistry registry,
TimeUnit durationUnit,
ScheduledExecutorService executor,
boolean shutdownExecutorOnStop) {
this(registry, name, filter, rateUnit, durationUnit, executor, shutdownExecutorOnStop, Collections.emptySet());
this(registry, name, filter, rateUnit, durationUnit, executor, shutdownExecutorOnStop, Collections.emptySet(), null);
}

protected ScheduledReporter(MetricRegistry registry,
Expand All @@ -130,7 +132,8 @@ protected ScheduledReporter(MetricRegistry registry,
TimeUnit durationUnit,
ScheduledExecutorService executor,
boolean shutdownExecutorOnStop,
Set<MetricAttribute> disabledMetricAttributes) {
Set<MetricAttribute> disabledMetricAttributes,
GetScheduledFuture getScheduledFuture) {

if (registry == null) {
throw new NullPointerException("registry == null");
Expand All @@ -146,6 +149,8 @@ protected ScheduledReporter(MetricRegistry registry,
this.durationUnit = durationUnit.toString().toLowerCase(Locale.US);
this.disabledMetricAttributes = disabledMetricAttributes != null ? disabledMetricAttributes :
Collections.emptySet();
this.getScheduledFuture = getScheduledFuture == null ? this::getScheduledFuture : getScheduledFuture;

}

/**
Expand All @@ -167,20 +172,10 @@ synchronized void start(long initialDelay, long period, TimeUnit unit, Runnable
throw new IllegalArgumentException("Reporter already started");
}

this.scheduledFuture = getScheduledFuture(initialDelay, period, unit, runnable);
this.scheduledFuture = this.getScheduledFuture.schedule(initialDelay, period, unit, runnable, this.executor);
}


/**
* Schedule the task, and return a future.
*
* @deprecated Use {@link #getScheduledFuture(long, long, TimeUnit, Runnable, ScheduledExecutorService)} instead.
*/
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
protected ScheduledFuture<?> getScheduledFuture(long initialDelay, long period, TimeUnit unit, Runnable runnable) {
return getScheduledFuture(initialDelay, period, unit, runnable, this.executor);
}
Comment on lines -174 to -183

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am aware this is a breaking change. Unfortunately, I did not find a way to make this function still relevant while passing the functional interface
Additionally, I think it is unlikely someone is overriding this method because it can make sense only with an executor.

What do you think?
Something obvious I missed in keeping this method for retro compatibility, while still making it relevant?


/**
* Schedule the task, and return a future.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private Slf4jReporter(MetricRegistry registry,
boolean shutdownExecutorOnStop,
Set<MetricAttribute> disabledMetricAttributes) {
super(registry, "logger-reporter", filter, rateUnit, durationUnit, executor, shutdownExecutorOnStop,
disabledMetricAttributes);
disabledMetricAttributes, null);
this.loggerProxy = loggerProxy;
this.marker = marker;
this.prefix = prefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.junit.Before;
import org.junit.Test;

import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -46,6 +47,8 @@ public class ScheduledReporterTest {
private final ScheduledReporter reporterWithCustomMockExecutor = new DummyReporter(registry, "example", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS, mockExecutor);
private final ScheduledReporter reporterWithCustomExecutor = new DummyReporter(registry, "example", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS, customExecutor);
private final DummyReporter reporterWithExternallyManagedExecutor = new DummyReporter(registry, "example", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS, externalExecutor, false);

private final ScheduledReporter reporterWithFixedRate = new DummyReporter(registry, "example", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS, mockExecutor, false, null, (initialDelay, period, unit, runnable, executor) -> executor.scheduleAtFixedRate(runnable, initialDelay, period, unit));
private final ScheduledReporter[] reporters = new ScheduledReporter[] {reporter, reporterWithCustomExecutor, reporterWithExternallyManagedExecutor};

@Before
Expand Down Expand Up @@ -120,6 +123,15 @@ public void shouldStartWithSpecifiedInitialDelay() throws Exception {
);
}

@Test
public void shouldUseFixedRateWhenSpecified() throws Exception {
reporterWithFixedRate.start(200, TimeUnit.MILLISECONDS);

verify(mockExecutor).scheduleAtFixedRate(
any(Runnable.class), eq(200L), eq(200L), eq(TimeUnit.MILLISECONDS)
);
}

@Test
public void shouldAutoCreateExecutorWhenItNull() throws Exception {
CountDownLatch latch = new CountDownLatch(2);
Expand Down Expand Up @@ -274,6 +286,10 @@ private static class DummyReporter extends ScheduledReporter {
super(registry, name, filter, rateUnit, durationUnit, executor, shutdownExecutorOnStop);
}

DummyReporter(MetricRegistry registry, String name, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit, ScheduledExecutorService executor, boolean shutdownExecutorOnStop, Set<MetricAttribute> disabledMetricAttributes, GetScheduledFuture getScheduledFuture) {
super(registry, name, filter, rateUnit, durationUnit, executor, shutdownExecutorOnStop, disabledMetricAttributes, getScheduledFuture);
}

@Override
@SuppressWarnings("rawtypes")
public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
package com.codahale.metrics.graphite;

import com.codahale.metrics.Clock;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metered;
import com.codahale.metrics.MetricAttribute;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import com.codahale.metrics.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -75,6 +64,8 @@ public static class Builder {
private boolean addMetricAttributesAsTags;
private DoubleFunction<String> floatingPointFormatter;

private GetScheduledFuture<?> getScheduledFuture;

private Builder(MetricRegistry registry) {
this.registry = registry;
this.clock = Clock.defaultClock();
Expand All @@ -87,6 +78,7 @@ private Builder(MetricRegistry registry) {
this.disabledMetricAttributes = Collections.emptySet();
this.addMetricAttributesAsTags = false;
this.floatingPointFormatter = DEFAULT_FP_FORMATTER;
this.getScheduledFuture = null;
}

/**
Expand Down Expand Up @@ -211,6 +203,18 @@ public Builder withFloatingPointFormatter(DoubleFunction<String> floatingPointFo
return this;
}

/**
* Use custom schedule logic.
* By default, logic is scheduledWithFixedDelay
*
* @param getScheduledFuture a method to schedule next update
* @return {@code this}
*/
public Builder withScheduledFuture(GetScheduledFuture<?> getScheduledFuture) {
this.getScheduledFuture = getScheduledFuture;
return this;
}

/**
* Builds a {@link GraphiteReporter} with the given properties, sending metrics using the
* given {@link GraphiteSender}.
Expand Down Expand Up @@ -243,7 +247,8 @@ public GraphiteReporter build(GraphiteSender graphite) {
shutdownExecutorOnStop,
disabledMetricAttributes,
addMetricAttributesAsTags,
floatingPointFormatter);
floatingPointFormatter,
getScheduledFuture);
}
}

Expand Down Expand Up @@ -318,7 +323,7 @@ protected GraphiteReporter(MetricRegistry registry,
Set<MetricAttribute> disabledMetricAttributes,
boolean addMetricAttributesAsTags) {
this(registry, graphite, clock, prefix, rateUnit, durationUnit, filter, executor, shutdownExecutorOnStop,
disabledMetricAttributes, addMetricAttributesAsTags, DEFAULT_FP_FORMATTER);
disabledMetricAttributes, addMetricAttributesAsTags, DEFAULT_FP_FORMATTER, null);
}

/**
Expand All @@ -338,6 +343,7 @@ protected GraphiteReporter(MetricRegistry registry,
* @param disabledMetricAttributes do not report specific metric attributes
* @param addMetricAttributesAsTags if true, then add metric attributes as tags instead of suffixes
* @param floatingPointFormatter custom floating point formatter
* @param getScheduledFuture custom scheduling logic (may be null).
*/
protected GraphiteReporter(MetricRegistry registry,
GraphiteSender graphite,
Expand All @@ -350,9 +356,11 @@ protected GraphiteReporter(MetricRegistry registry,
boolean shutdownExecutorOnStop,
Set<MetricAttribute> disabledMetricAttributes,
boolean addMetricAttributesAsTags,
DoubleFunction<String> floatingPointFormatter) {
DoubleFunction<String> floatingPointFormatter,
GetScheduledFuture<?> getScheduledFuture
) {
super(registry, "graphite-reporter", filter, rateUnit, durationUnit, executor, shutdownExecutorOnStop,
disabledMetricAttributes);
disabledMetricAttributes, getScheduledFuture);
this.graphite = graphite;
this.clock = clock;
this.prefix = prefix;
Expand Down