Skip to content

Commit

Permalink
filter thread level metrics based on name patterns in PA
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Oct 5, 2023
1 parent b8fcc04 commit d43ef7e
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,15 @@ private static void parseThreadInfo(final ThreadInfo info) {

static void runThreadDump(String pid, String[] args) {
String currentThreadName = Thread.currentThread().getName();
assert currentThreadName.startsWith(
ScheduledMetricCollectorsExecutor.COLLECTOR_THREAD_POOL_NAME)
|| currentThreadName.equals(
ScheduledMetricCollectorsExecutor.class.getSimpleName())
: String.format(
"Thread dump called from a non os collector thread: %s", currentThreadName);
if (currentThreadName.startsWith(
ScheduledMetricCollectorsExecutor.COLLECTOR_THREAD_POOL_NAME)
|| currentThreadName.equals(
ScheduledMetricCollectorsExecutor.class.getSimpleName())) {
LOGGER.error(
String.format(
"Thread dump called from a non os collector thread: %s",
currentThreadName));
}
jTidNameMap.clear();
oldNativeTidMap.putAll(nativeTidMap);
nativeTidMap.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,22 @@

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.config.ConfigStatus;
import org.opensearch.performanceanalyzer.commons.jvm.ThreadList;
import org.opensearch.performanceanalyzer.commons.jvm.ThreadList.ThreadState;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;

public class OSGlobals {
private static long scClkTck;
Expand All @@ -27,11 +36,43 @@ public class OSGlobals {
private static List<String> tids = new ArrayList<>();
private static long lastUpdated = -1;

private static Map<Long, ThreadState> threadStates;

// To allow changing the value of the allowedList dynamically in the future (through an API),
// we use a static list to store the allowlist and denylist patterns.
public static List<Pattern> AllowedThreadNamePatterns =
new ArrayList<>(
Arrays.asList(
// GC pattern
Pattern.compile(".*(GC|CMS|Parallel).*"),
// all opensearch patterns
Pattern.compile(".*opensearch.*"),
// pa collectors
Pattern.compile(".*pa-collectors.*"),
// opendistro job sweeper
Pattern.compile(".*opendistro_job_sweeper.*")));

public static List<Pattern> DeniedThreadNamePatterns =
new ArrayList<>(
Arrays.asList(
// Ignore all ForkJoinPool workers threads since this is a known thread
// leak issue
Pattern.compile("ForkJoinPool-\\d+-worker-\\d+"),
Pattern.compile("ForkJoinPool-\\d+")));

// Collect data for threads not in allowlist or denylist, until it reaches
// otherThreadsMetricsLimit threshold
private static final int otherThreadsMetricsLimit = 500;
// Total number of threads collected should be smaller than totalThreadsMetricsLimit to avoid
// performance impact on the system.
private static final int totalThreadsMetricsLimit = 1000;

static {
try {
pid = new File("/proc/self").getCanonicalFile().getName();
getScClkTckFromConfig();
enumTids();
updateThreadStates();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
LOGGER.error(
Expand Down Expand Up @@ -67,20 +108,56 @@ private static void getScClkTckFromConfig() throws Exception {
}
}

private static void updateThreadStates() {
threadStates = ThreadList.getNativeTidMap(false);
}

private static boolean hasMatchedPattern(String threadName, List<Pattern> patterns) {
if (threadName == null) {
return false;
}
for (Pattern pattern : patterns) {
if (pattern.matcher(threadName).matches()) {
return true;
}
}
return false;
}

private static void enumTids() {
tids.clear();
tids.add(pid);

File self = new File("/proc/self/task");
File[] filesList = self.listFiles();
if (filesList != null) {
for (File f : filesList) {
if (f.isDirectory()) {
String tid = f.getName();
tids.add(tid);
// Maintaining a set to keep track of metrics not in either allowlist and denylist.
Set<String> otherThreadsMetrics = new HashSet<>();
updateThreadStates();
for (Map.Entry<Long, ThreadState> threadStateEntry : threadStates.entrySet()) {
Long tid = threadStateEntry.getKey();
ThreadState threadState = threadStateEntry.getValue();
// add this thread if the thread name is in allowlist
if (hasMatchedPattern(threadState.threadName, AllowedThreadNamePatterns)) {
tids.add(tid.toString());
}
// skip this thread if the thread name is in denylist
else if (hasMatchedPattern(threadState.threadName, DeniedThreadNamePatterns)) {
continue;
}
// if the thread name is neither in allowlist nor denylist, add
// this thread until certain threshold is reached.
else {
if (otherThreadsMetrics.size() < otherThreadsMetricsLimit) {
otherThreadsMetrics.add(threadState.threadName);
tids.add(tid.toString());
}
}
}
if (otherThreadsMetrics.size() >= otherThreadsMetricsLimit) {
StatsCollector.instance()
.logException(StatExceptionCode.OTHER_THREAD_METRICS_NUMBER_EXCEEDS_THRESHOLD);
}
if (tids.size() >= totalThreadsMetricsLimit) {
StatsCollector.instance()
.logException(StatExceptionCode.TOTAL_THREAD_METRICS_NUMBER_EXCEEDS_THRESHOLD);
}
}

static synchronized List<String> getTids() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public enum StatExceptionCode {
ELECTION_TERM_COLLECTOR_ERROR("ElectionTermCollectorError"),
SHARD_INDEXING_PRESSURE_COLLECTOR_ERROR("ShardIndexingPressureMetricsCollectorError"),
NODESTATS_COLLECTION_ERROR("NodeStatsCollectionError"),
TOTAL_THREAD_METRICS_NUMBER_EXCEEDS_THRESHOLD("TotalThreadMetricsNumberExceedsThreshold"),
OTHER_THREAD_METRICS_NUMBER_EXCEEDS_THRESHOLD("OtherThreadMetricsNumberExceedsThreshold"),

/** Below tracks Reader specific Errors. */
READER_ERROR_PA_DISABLE_SUCCESS("ReaderErrorPADisableSuccess"),
Expand Down

0 comments on commit d43ef7e

Please sign in to comment.