Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: mute collectors based on exception count #56

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ compileJava {
}

test {
jvmArgs '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED'

testLogging {
exceptionFormat "full"
events "skipped", "passed", "failed", "started"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public StringBuilder getValue() {
return value;
}

public StatExceptionCode getErrorMetric() {
return errorMetric;
}

public State getState() {
return state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,15 @@ public void run() {
}
metricsCollectors.put(
collector, entry.getValue() + collector.getTimeInterval());
if (!collector.inProgress()) {

// exceptionCount is incremented in StatsCollector
// and reset every time StatsCollector is run (i.e. every 60 sec)
// since other collectors run every 5 sec, we can obtain an estimate
// of the exceptions they've thrown in the last ~10 runs or so.
int exceptionCount =
StatsCollector.instance()
.exceptionCount(collector.getErrorMetric());
if (!collector.inProgress() && exceptionCount < 5) {
collector.setStartTime(currentTime);
metricsCollectorsTP.execute(collector);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ public void logException(StatExceptionCode statExceptionCode) {
incErrorCounter();
}

// returns the number of exceptions that have occurred
public int exceptionCount(StatExceptionCode statExceptionCode) {
AtomicInteger val = counters.get(statExceptionCode.toString());
if (val == null) {
return 0;
} else {
return val.get();
}
}

public void logStatsRecord(
Map<String, AtomicInteger> counterData,
Map<String, String> statsData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public EventLogFileHandler(EventLog eventLog, String metricsLocation) {
}

public void writeTmpFile(List<Event> dataEntries, long epoch) {
Util.invokePrivileged(() -> writeTmpFileWithPrivilege(dataEntries, epoch));
Util.invokePrivilegedAndLogError(() -> writeTmpFileWithPrivilege(dataEntries, epoch));
}

/**
Expand Down Expand Up @@ -93,7 +93,7 @@ public void writeTmpFileWithPrivilege(List<Event> dataEntries, long epoch) {
}

public void renameFromTmp(long epoch) {
Util.invokePrivileged(() -> renameFromTmpWithPrivilege(epoch));
Util.invokePrivilegedAndLogError(() -> renameFromTmpWithPrivilege(epoch));
}

public void renameFromTmpWithPrivilege(long epoch) {
Expand Down Expand Up @@ -161,7 +161,7 @@ private void readInternal(Path pathToFile, int bufferSize, EventDispatcher proce
}

public void deleteAllFiles() {
Util.invokePrivileged(this::deleteAllFilesWithPrivilege);
Util.invokePrivilegedAndLogError(this::deleteAllFilesWithPrivilege);
}

public void deleteAllFilesWithPrivilege() {
Expand Down Expand Up @@ -197,6 +197,6 @@ public void deleteFiles(List<String> filesToDelete) {
}

public void removeFilesWithPrivilege(File file) {
Util.invokePrivileged(() -> PerformanceAnalyzerMetrics.removeMetrics(file));
Util.invokePrivilegedAndLogError(() -> PerformanceAnalyzerMetrics.removeMetrics(file));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class Disks {
};

static {
Util.invokePrivileged(() -> listDisks());
Util.invokePrivilegedAndLogError(() -> listDisks());
oldkvTimestamp = System.currentTimeMillis();
kvTimestamp = oldkvTimestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public enum StatExceptionCode {
CACHE_CONFIG_METRICS_COLLECTOR_ERROR("CacheConfigMetricsCollectorError"),
ADMISSION_CONTROL_COLLECTOR_ERROR("AdmissionControlCollectorError"),
CIRCUIT_BREAKER_COLLECTOR_ERROR("CircuitBreakerCollectorError"),
CLUSTER_MANAGER_SERVICE_EVENTS_METRICS_COLLECTOR_ERROR("ClusterManagerServiceEventsMetricsCollectorError"),
CLUSTER_MANAGER_SERVICE_EVENTS_METRICS_COLLECTOR_ERROR(
"ClusterManagerServiceEventsMetricsCollectorError"),
CLUSTER_MANAGER_SERVICE_METRICS_COLLECTOR_ERROR("ClusterManagerServiceMetricsCollectorError"),
CLUSTER_MANAGER_THROTTLING_COLLECTOR_ERROR("ClusterManagerThrottlingMetricsCollectorError"),
FAULT_DETECTION_COLLECTOR_ERROR("FaultDetectionMetricsCollectorError"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,7 @@ public static void invokePrivileged(Runnable runner) {
AccessController.doPrivileged(
(PrivilegedAction<Void>)
() -> {
try {
runner.run();
} catch (Exception ex) {
LOG.debug(
(Supplier<?>)
() ->
new ParameterizedMessage(
"Privileged Invocation failed {}",
ex.toString()),
ex);
}
runner.run();
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.commons.collectors;

import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.HashMap;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.opensearch.performanceanalyzer.commons.collectors.TestCollector.RunBehaviour;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.management.*")
@PrepareForTest({StatsCollector.class})
public class ScheduledMetricCollectorsExecutorTests {
@Test
public void testSlowMuting() throws Exception {
ScheduledMetricCollectorsExecutor executor = new ScheduledMetricCollectorsExecutor();
executor.setEnabled(true);

ArrayList<RunBehaviour> bh = new ArrayList<RunBehaviour>();
bh.add(new RunBehaviour(10, 0, true));

TestCollector tc = spy(new TestCollector(10, bh));
executor.addScheduledMetricCollector(tc);

// mock StatsCollector.instance
PowerMockito.mockStatic(StatsCollector.class);
StatsCollector sc =
new StatsCollector("statsCollector", 300, new HashMap<String, String>());
when(StatsCollector.instance()).thenReturn(sc);

executor.addScheduledMetricCollector(StatsCollector.instance());

executor.start();

Thread.sleep(1000);

verify(tc).setState(PerformanceAnalyzerMetricsCollector.State.SLOW);
verify(tc).setState(PerformanceAnalyzerMetricsCollector.State.MUTED);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.commons.collectors;


import java.util.ArrayList;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics;

public class TestCollector extends PerformanceAnalyzerMetricsCollector {
public static class RunBehaviour {
int repetitions;
int current_reps;
int latency;
boolean shouldThrow;

RunBehaviour(int repetitions, int latency, boolean throwException) {
this.repetitions = repetitions;
this.current_reps = repetitions;
this.latency = latency;
this.shouldThrow = throwException;
}
}

ArrayList<RunBehaviour> bh;
int idx = 0;

TestCollector(int timeInterval, ArrayList<RunBehaviour> bh) {
// timeInterval is measured in milliseconds
// default timeInterval is 5000 milliseconds (5 seconds) for all collectors
// except StatsCollector, which is 60 seconds
// this collector pretents to be DisksCollector for test purposes
super(
timeInterval,
"testCollector",
StatMetrics.DISKS_COLLECTOR_EXECUTION_TIME,
StatExceptionCode.DISK_METRICS_COLLECTOR_ERROR);
this.bh = bh;
}

@Override
public void collectMetrics(long startTime) {
try {
RunBehaviour b = bh.get(idx);
if (b.latency > 0) {
Thread.sleep(b.latency);
}

if (b.current_reps > 0) {
b.current_reps--;
} else {
// reset repetitions to use for next iteration
b.current_reps = b.repetitions;
idx++;
}

if (b.shouldThrow) {
throw new RuntimeException("TestCollector exception");
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IndexOutOfBoundsException e) {
// start from beginning
idx = 0;
}
}
}
Loading