Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds the listener for resource utilization metrics #687

Merged
merged 8 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@
import org.opensearch.performanceanalyzer.http_action.whoami.TransportWhoAmIAction;
import org.opensearch.performanceanalyzer.http_action.whoami.WhoAmIAction;
import org.opensearch.performanceanalyzer.listener.PerformanceAnalyzerSearchListener;
import org.opensearch.performanceanalyzer.listener.RTFPerformanceAnalyzerSearchListener;
import org.opensearch.performanceanalyzer.transport.PerformanceAnalyzerTransportInterceptor;
import org.opensearch.performanceanalyzer.transport.RTFPerformanceAnalyzerTransportInterceptor;
import org.opensearch.performanceanalyzer.util.Utils;
import org.opensearch.performanceanalyzer.writer.EventLogQueueProcessor;
import org.opensearch.plugins.ActionPlugin;
Expand Down Expand Up @@ -302,7 +304,10 @@
public void onIndexModule(IndexModule indexModule) {
PerformanceAnalyzerSearchListener performanceanalyzerSearchListener =
new PerformanceAnalyzerSearchListener(performanceAnalyzerController);
RTFPerformanceAnalyzerSearchListener rtfPerformanceAnalyzerSearchListener =

Check warning on line 307 in src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java#L307

Added line #L307 was not covered by tests
new RTFPerformanceAnalyzerSearchListener(performanceAnalyzerController);
indexModule.addSearchOperationListener(performanceanalyzerSearchListener);
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
indexModule.addSearchOperationListener(rtfPerformanceAnalyzerSearchListener);

Check warning on line 310 in src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java#L310

Added line #L310 was not covered by tests
}

// follower check, leader check
Expand Down Expand Up @@ -330,8 +335,9 @@
@Override
public List<TransportInterceptor> getTransportInterceptors(
NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) {
return singletonList(
new PerformanceAnalyzerTransportInterceptor(performanceAnalyzerController));
return Arrays.asList(
new PerformanceAnalyzerTransportInterceptor(performanceAnalyzerController),
new RTFPerformanceAnalyzerTransportInterceptor(performanceAnalyzerController));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,4 +383,13 @@

return disabledCollectorsList.contains(collectorName);
}

/**
* Collectors Setting value.
*
* @return collectorsSettingValue
*/
public int getCollectorsSettingValue() {
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
return collectorsSettingValue;

Check warning on line 393 in src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java#L393

Added line #L393 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR;

import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.shard.SearchOperationListener;
Expand All @@ -16,6 +17,7 @@
import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.util.ThreadIDUtil;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.search.internal.SearchContext;

Expand All @@ -36,8 +38,16 @@ public String toString() {
return PerformanceAnalyzerSearchListener.class.getSimpleName();
}

private SearchListener getSearchListener() {
return controller.isPerformanceAnalyzerEnabled() ? this : NO_OP_SEARCH_LISTENER;
@VisibleForTesting
SearchListener getSearchListener() {
return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER;
}

private boolean isSearchListenerEnabled() {
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
return controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsSettingValue()
== Util.CollectorMode.RCA.getValue());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.listener;

import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.util.Utils;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

/**
* {@link SearchOperationListener} to capture the resource utilization of a shard search operation.
* This will be getting the resource tracking information from the {@link
* org.opensearch.tasks.TaskResourceTrackingService}.
*/
public class RTFPerformanceAnalyzerSearchListener
implements SearchOperationListener, SearchListener {

private static final Logger LOG =
LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class);
private static final String OPERATION_SHARD_FETCH = "shard_fetch";
private static final String OPERATION_SHARD_QUERY = "shard_query";
public static final String QUERY_START_TIME = "query_start_time";
public static final String FETCH_START_TIME = "fetch_start_time";
public static final String QUERY_TASK_ID = "query_task_id";
private final ThreadLocal<Map<String, Long>> threadLocal;
private static final SearchListener NO_OP_SEARCH_LISTENER = new NoOpSearchListener();

private final PerformanceAnalyzerController controller;
private final Histogram cpuUtilizationHistogram;
private final Histogram heapUsedHistogram;
private final int numProcessors;

public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) {
this.controller = controller;
this.cpuUtilizationHistogram = createCPUUtilizationHistogram();
heapUsedHistogram = createHeapUsedHistogram();
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
this.threadLocal = ThreadLocal.withInitial(() -> new HashMap<String, Long>());
this.numProcessors = Runtime.getRuntime().availableProcessors();
}

private Histogram createCPUUtilizationHistogram() {
MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
if (metricsRegistry != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the registry is null it does not make sense to create this listener, Do we need to throw exception instead of returning null?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We just want to stay silent.

return metricsRegistry.createHistogram(
RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(),
"CPU Utilization per shard for an operation",
RTFMetrics.MetricUnits.RATE.toString());
} else {
LOG.debug("MetricsRegistry is null");
return null;

Check warning on line 69 in src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java#L68-L69

Added lines #L68 - L69 were not covered by tests
}
}

private Histogram createHeapUsedHistogram() {
MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString(),
"Heap used per shard for an operation",
RTFMetrics.MetricUnits.BYTE.toString());
} else {
LOG.debug("MetricsRegistry is null");
return null;

Check warning on line 82 in src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java#L81-L82

Added lines #L81 - L82 were not covered by tests
}
}

@Override
public String toString() {
return RTFPerformanceAnalyzerSearchListener.class.getSimpleName();
}

@VisibleForTesting
SearchListener getSearchListener() {
return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER;
}

private boolean isSearchListenerEnabled() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isSearchListenerEnabled seems to be a mandatory method for any SearchListeners implementation. Should we add it to the super class and all the subclasses needs to write the logic of this method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that. Didn't want to change a lot the existing stuff as anyways the plan is to bring this into core at some point in time.

LOG.debug(
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
"Controller enable status {}, CollectorMode value {}",
controller.isPerformanceAnalyzerEnabled(),
controller.getCollectorsSettingValue());
return OpenSearchResources.INSTANCE.getMetricsRegistry() != null
&& controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsSettingValue()
== Util.CollectorMode.TELEMETRY.getValue());
}

@Override
public void onPreQueryPhase(SearchContext searchContext) {
try {
getSearchListener().preQueryPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we throwing only OPENSEARCH_REQUEST_INTERCEPTOR_ERROR from all the Phases?
Is this correct or do we need to throw different exceptions from different Phases.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to add a stat that there is some issue with the interceptor.

}
}

Check warning on line 116 in src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java#L111-L116

Added lines #L111 - L116 were not covered by tests

@Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
try {
getSearchListener().queryPhase(searchContext, tookInNanos);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

Check warning on line 126 in src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java#L121-L126

Added lines #L121 - L126 were not covered by tests

@Override
public void onFailedQueryPhase(SearchContext searchContext) {
try {
getSearchListener().failedQueryPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

Check warning on line 136 in src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java#L131-L136

Added lines #L131 - L136 were not covered by tests

@Override
public void onPreFetchPhase(SearchContext searchContext) {
try {
getSearchListener().preFetchPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

Check warning on line 146 in src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java#L141-L146

Added lines #L141 - L146 were not covered by tests

@Override
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
try {
getSearchListener().fetchPhase(searchContext, tookInNanos);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

Check warning on line 156 in src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java#L151-L156

Added lines #L151 - L156 were not covered by tests

@Override
public void onFailedFetchPhase(SearchContext searchContext) {
try {
getSearchListener().failedFetchPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

Check warning on line 166 in src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java#L161-L166

Added lines #L161 - L166 were not covered by tests

@Override
public void preQueryPhase(SearchContext searchContext) {
threadLocal.get().put(QUERY_START_TIME, System.nanoTime());
threadLocal.get().put(QUERY_TASK_ID, searchContext.getTask().getId());
}

@Override
public void queryPhase(SearchContext searchContext, long tookInNanos) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
long queryTime = (System.nanoTime() - queryStartTime);
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
addResourceTrackingCompletionListener(
searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, false);
}

@Override
public void failedQueryPhase(SearchContext searchContext) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
long queryTime = (System.nanoTime() - queryStartTime);
addResourceTrackingCompletionListener(
searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, true);
}

@Override
public void preFetchPhase(SearchContext searchContext) {
threadLocal.get().put(FETCH_START_TIME, System.nanoTime());
}

@Override
public void fetchPhase(SearchContext searchContext, long tookInNanos) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l);
long fetchTime = (System.nanoTime() - fetchStartTime);
addResourceTrackingCompletionListenerForFetchPhase(
searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, false);
}

@Override
public void failedFetchPhase(SearchContext searchContext) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l);
long fetchTime = (System.nanoTime() - fetchStartTime);
addResourceTrackingCompletionListenerForFetchPhase(
searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, true);
}

private void addResourceTrackingCompletionListener(
SearchContext searchContext,
long startTime,
long queryTime,
String operation,
boolean isFailed) {
addCompletionListener(searchContext, startTime, queryTime, operation, isFailed);
}

private void addResourceTrackingCompletionListenerForFetchPhase(
SearchContext searchContext,
long fetchStartTime,
long fetchTime,
String operation,
boolean isFailed) {
long overallStartTime = fetchStartTime;
long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, 0l);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the practice that is being followed for any Id that is being fetcher from threadLocal. Is it 0 by default?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling null would be very tricky.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use -1 instead of 0.

/**
* There are scenarios where both query and fetch pahses run in the same task for an
* optimization. Adding a special handling for that case to divide the CPU usage between
* these 2 operations by their runTime.
*/
if (queryTaskId == searchContext.getTask().getId()) {
overallStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
}
addCompletionListener(searchContext, overallStartTime, fetchTime, operation, isFailed);
}

private void addCompletionListener(
SearchContext searchContext,
long overallStartTime,
long operationTime,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call these as SearchOperationStartTime and PhaseTookTime.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is common method for fetch and search,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As these are being used in the context of search we can change operations to phase and change variable names.

overallStartTime -> startTime
operationTime -> phaseTookTime
operation -> phase

String operation,
boolean isFailed) {
searchContext
.getTask()
.addResourceTrackingCompletionListener(
createListener(
searchContext,
overallStartTime,
operationTime,
operation,
isFailed));
}

@VisibleForTesting
NotifyOnceListener<Task> createListener(
SearchContext searchContext,
long overallStartTime,
long totalOperationTime,
String operation,
boolean isFailed) {
return new NotifyOnceListener<Task>() {
@Override
protected void innerOnResponse(Task task) {
LOG.debug("Updating the counter for task {}", task.getId());
/**
* There are scenarios where cpuUsageTime consists of the total of CPU of multiple
* operations. In that case we are computing the cpuShareFactor by dividing the
* particular operationTime and the total time till this calculation happen from the
* overall start time.
*/
long totalTime = System.nanoTime() - overallStartTime;
double operationShareFactor = computeShareFactor(totalOperationTime, totalTime);
cpuUtilizationHistogram.record(
Utils.calculateCPUUtilization(
numProcessors,
totalTime,
task.getTotalResourceStats().getCpuTimeInNanos(),
operationShareFactor),
createTags());
heapUsedHistogram.record(
Math.max(
0,
task.getTotalResourceStats().getMemoryInBytes()
* operationShareFactor),
createTags());
}

private Tags createTags() {
return Tags.create()
.addTag(
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
searchContext.request().shardId().getIndex().getName())
.addTag(
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
searchContext.request().shardId().getIndex().getUUID())
.addTag(
RTFMetrics.CommonDimension.SHARD_ID.toString(),
searchContext.request().shardId().getId())
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation)
.addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed);
}

@Override
protected void innerOnFailure(Exception e) {
LOG.error("Error is executing the the listener", e);
}

Check warning on line 308 in src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java#L307-L308

Added lines #L307 - L308 were not covered by tests
};
}

@VisibleForTesting
static double computeShareFactor(long totalOperationTime, long totalTime) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a javadoc explaining what are we trying to calculate and what exactly this method provides.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added it from where this is being called.

return Math.min(1, ((double) totalOperationTime) / Math.max(1.0, totalTime));
}
}
Loading
Loading