Skip to content

Commit

Permalink
Added cacheConfig Collector (#690)
Browse files Browse the repository at this point in the history
  • Loading branch information
atharvasharma61 authored Sep 3, 2024
1 parent a607422 commit 2624469
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.performanceanalyzer.collectors.ShardIndexingPressureMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.ShardStateCollector;
import org.opensearch.performanceanalyzer.collectors.ThreadPoolMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFCacheConfigMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector;
Expand Down Expand Up @@ -235,6 +236,9 @@ private void scheduleTelemetryCollectors() {
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFNodeStatsAllShardsMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFCacheConfigMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
}

private void scheduleRcaCollectors() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.collectors.telemetry;

import static org.opensearch.performanceanalyzer.commons.metrics.AllMetrics.CacheType.FIELD_DATA_CACHE;
import static org.opensearch.performanceanalyzer.commons.metrics.AllMetrics.CacheType.SHARD_REQUEST_CACHE;
import static org.opensearch.performanceanalyzer.commons.stats.decisionmaker.DecisionMakerConsts.CACHE_MAX_WEIGHT;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.security.AccessController;
import java.security.PrivilegedAction;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.cache.Cache;
import org.opensearch.indices.IndicesService;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.collectors.MetricStatus;
import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector;
import org.opensearch.performanceanalyzer.commons.collectors.TelemetryCollector;
import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper;
import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

public class RTFCacheConfigMetricsCollector extends PerformanceAnalyzerMetricsCollector
implements TelemetryCollector {
private MetricsRegistry metricsRegistry;
private static final Logger LOG = LogManager.getLogger(RTFCacheConfigMetricsCollector.class);
private PerformanceAnalyzerController performanceAnalyzerController;
private ConfigOverridesWrapper configOverridesWrapper;

public RTFCacheConfigMetricsCollector(
PerformanceAnalyzerController performanceAnalyzerController,
ConfigOverridesWrapper configOverridesWrapper) {
super(
MetricsConfiguration.CONFIG_MAP.get(RTFCacheConfigMetricsCollector.class)
.samplingInterval,
"RTFCacheConfigMetricsCollector",
StatMetrics.RTF_CACHE_CONFIG_METRICS_COLLECTOR_EXECUTION_TIME,
StatExceptionCode.RTF_CACHE_CONFIG_METRICS_COLLECTOR_ERROR);
this.performanceAnalyzerController = performanceAnalyzerController;
this.configOverridesWrapper = configOverridesWrapper;
}

@Override
public void collectMetrics(long l) {
if (performanceAnalyzerController.isCollectorDisabled(
configOverridesWrapper, getCollectorName())) {
LOG.info("RTFCacheConfigMetricsCollector is disabled. Skipping collection.");
return;
}

metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
if (metricsRegistry == null) {
LOG.error("could not get the instance of MetricsRegistry class");
return;
}

IndicesService indicesService = OpenSearchResources.INSTANCE.getIndicesService();
if (indicesService == null) {
LOG.error("could not get the instance of indicesService class");
return;
}

LOG.debug("Executing collect metrics for RTFCacheConfigMetricsCollector");
CacheMaxSizeStatus fieldDataCacheMaxSizeStatus =
AccessController.doPrivileged(
(PrivilegedAction<CacheMaxSizeStatus>)
() -> {
try {
Cache fieldDataCache =
indicesService
.getIndicesFieldDataCache()
.getCache();
long fieldDataMaxSize =
(Long)
FieldUtils.readField(
fieldDataCache,
CACHE_MAX_WEIGHT,
true);
return new CacheMaxSizeStatus(
FIELD_DATA_CACHE.toString(), fieldDataMaxSize);
} catch (Exception e) {
LOG.debug(
"Error occurred while fetching fieldDataCacheMaxSizeStatus: "
+ e.getMessage());
return null;
}
});

CacheMaxSizeStatus shardRequestCacheMaxSizeStatus =
AccessController.doPrivileged(
(PrivilegedAction<CacheMaxSizeStatus>)
() -> {
try {
Object reqCache =
FieldUtils.readField(
indicesService,
"indicesRequestCache",
true);
Object openSearchOnHeapCache =
FieldUtils.readField(reqCache, "cache", true);
Cache requestCache =
(Cache)
FieldUtils.readField(
openSearchOnHeapCache,
"cache",
true);
Long requestCacheMaxSize =
(Long)
FieldUtils.readField(
requestCache,
CACHE_MAX_WEIGHT,
true);
return new CacheMaxSizeStatus(
SHARD_REQUEST_CACHE.toString(),
requestCacheMaxSize);
} catch (Exception e) {
LOG.debug(
"Error occurred while fetching shardRequestCacheMaxSizeStatus: "
+ e.getMessage());
return null;
}
});

if (fieldDataCacheMaxSizeStatus != null
&& fieldDataCacheMaxSizeStatus.getCacheMaxSize() > 0) {
recordMetrics(fieldDataCacheMaxSizeStatus);
}

if (shardRequestCacheMaxSizeStatus != null
&& shardRequestCacheMaxSizeStatus.getCacheMaxSize() > 0) {
recordMetrics(shardRequestCacheMaxSizeStatus);
}
}

private void recordMetrics(CacheMaxSizeStatus cacheMaxSizeStatus) {
metricsRegistry.createGauge(
RTFMetrics.CacheConfigValue.Constants.CACHE_MAX_SIZE_VALUE,
"Cache Max Size metrics",
RTFMetrics.MetricUnits.BYTE.toString(),
() -> (double) cacheMaxSizeStatus.getCacheMaxSize(),
Tags.create()
.addTag(
RTFMetrics.CacheConfigDimension.Constants.TYPE_VALUE,
cacheMaxSizeStatus.getCacheType()));
}

static class CacheMaxSizeStatus extends MetricStatus {

private final String cacheType;

@JsonInclude(JsonInclude.Include.NON_NULL)
private final Long cacheMaxSize;

CacheMaxSizeStatus(String cacheType, Long cacheMaxSize) {
this.cacheType = cacheType;
this.cacheMaxSize = cacheMaxSize;
}

@JsonProperty(AllMetrics.CacheConfigDimension.Constants.TYPE_VALUE)
public String getCacheType() {
return cacheType;
}

@JsonProperty(AllMetrics.CacheConfigValue.Constants.CACHE_MAX_SIZE_VALUE)
public long getCacheMaxSize() {
return cacheMaxSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.collectors.*;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFThreadPoolMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.*;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.stats.ServiceMetrics;

Expand Down Expand Up @@ -54,6 +51,9 @@ public static void configureMetrics() {
MetricsConfiguration.CONFIG_MAP.put(RTFHeapMetricsCollector.class, cdefault);
MetricsConfiguration.CONFIG_MAP.put(RTFNodeStatsAllShardsMetricsCollector.class, cdefault);
MetricsConfiguration.CONFIG_MAP.put(RTFThreadPoolMetricsCollector.class, cdefault);
MetricsConfiguration.CONFIG_MAP.put(
RTFCacheConfigMetricsCollector.class,
new MetricsConfiguration.MetricConfig(60000, 0));
}

// These methods are utility functions for the Node Stat Metrics Collectors. These methods are
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.collectors.telemetry;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.opensearch.indices.IndicesService;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

public class RTFCacheConfigMetricsCollectorTests extends OpenSearchSingleNodeTestCase {
private static final String TEST_INDEX = "test";
private RTFCacheConfigMetricsCollector rtfCacheConfigMetricsCollector;
private static MetricsRegistry metricsRegistry;
private static Histogram testHistogram;
private long startTimeInMills = 1153721339;

@Before
public void init() {
MetricsConfiguration.CONFIG_MAP.put(
RTFCacheConfigMetricsCollector.class, MetricsConfiguration.cdefault);
metricsRegistry = mock(MetricsRegistry.class);
testHistogram = mock(Histogram.class);
OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry);
when(metricsRegistry.createHistogram(anyString(), anyString(), anyString()))
.thenReturn(testHistogram);
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
OpenSearchResources.INSTANCE.setIndicesService(indicesService);
ConfigOverridesWrapper mockWrapper = mock(ConfigOverridesWrapper.class);
PerformanceAnalyzerController mockController = mock(PerformanceAnalyzerController.class);
Mockito.when(mockController.isCollectorDisabled(any(), anyString())).thenReturn(false);
rtfCacheConfigMetricsCollector =
spy(new RTFCacheConfigMetricsCollector(mockController, mockWrapper));
}

@After
public void tearDown() throws Exception {
super.tearDown();
}

@Test
public void testCollectMetrics() throws IOException {
createIndex(TEST_INDEX);
rtfCacheConfigMetricsCollector.collectMetrics(startTimeInMills);
verify(testHistogram, atLeastOnce()).record(anyDouble(), any());
}
}

0 comments on commit 2624469

Please sign in to comment.