Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds the listener for resource utilization metrics #687

Merged
merged 8 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@
import org.opensearch.performanceanalyzer.http_action.whoami.TransportWhoAmIAction;
import org.opensearch.performanceanalyzer.http_action.whoami.WhoAmIAction;
import org.opensearch.performanceanalyzer.listener.PerformanceAnalyzerSearchListener;
import org.opensearch.performanceanalyzer.listener.RTFPerformanceAnalyzerSearchListener;
import org.opensearch.performanceanalyzer.transport.PerformanceAnalyzerTransportInterceptor;
import org.opensearch.performanceanalyzer.transport.RTFPerformanceAnalyzerTransportInterceptor;
import org.opensearch.performanceanalyzer.util.Utils;
import org.opensearch.performanceanalyzer.writer.EventLogQueueProcessor;
import org.opensearch.plugins.ActionPlugin;
Expand Down Expand Up @@ -302,7 +304,10 @@ public List<ActionFilter> getActionFilters() {
public void onIndexModule(IndexModule indexModule) {
PerformanceAnalyzerSearchListener performanceanalyzerSearchListener =
new PerformanceAnalyzerSearchListener(performanceAnalyzerController);
RTFPerformanceAnalyzerSearchListener rtfPerformanceAnalyzerSearchListener =
new RTFPerformanceAnalyzerSearchListener(performanceAnalyzerController);
indexModule.addSearchOperationListener(performanceanalyzerSearchListener);
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
indexModule.addSearchOperationListener(rtfPerformanceAnalyzerSearchListener);
}

// follower check, leader check
Expand Down Expand Up @@ -330,8 +335,9 @@ public void onDiscovery(Discovery discovery) {
@Override
public List<TransportInterceptor> getTransportInterceptors(
NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) {
return singletonList(
new PerformanceAnalyzerTransportInterceptor(performanceAnalyzerController));
return Arrays.asList(
new PerformanceAnalyzerTransportInterceptor(performanceAnalyzerController),
new RTFPerformanceAnalyzerTransportInterceptor(performanceAnalyzerController));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,4 +383,13 @@ public boolean isCollectorDisabled(

return disabledCollectorsList.contains(collectorName);
}

/**
* Collectors Setting value.
*
* @return collectorsSettingValue
*/
public int getCollectorsSettingValue() {
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
return collectorsSettingValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR;

import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.shard.SearchOperationListener;
Expand All @@ -16,6 +17,7 @@
import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.util.ThreadIDUtil;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.search.internal.SearchContext;

Expand All @@ -36,8 +38,16 @@ public String toString() {
return PerformanceAnalyzerSearchListener.class.getSimpleName();
}

private SearchListener getSearchListener() {
return controller.isPerformanceAnalyzerEnabled() ? this : NO_OP_SEARCH_LISTENER;
@VisibleForTesting
SearchListener getSearchListener() {
return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER;
}

private boolean isSearchListenerEnabled() {
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
return controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsSettingValue()
== Util.CollectorMode.RCA.getValue());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.listener;

import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.util.Utils;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

/**
* {@link SearchOperationListener} to capture the resource utilization of a shard search operation.
* This will be getting the resource tracking information from the {@link
* org.opensearch.tasks.TaskResourceTrackingService}.
*/
public class RTFPerformanceAnalyzerSearchListener
implements SearchOperationListener, SearchListener {

private static final Logger LOG =
LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class);
private static final String OPERATION_SHARD_FETCH = "shard_fetch";
private static final String OPERATION_SHARD_QUERY = "shard_query";
public static final String QUERY_START_TIME = "query_start_time";
public static final String FETCH_START_TIME = "fetch_start_time";
private final ThreadLocal<Map<String, Long>> threadLocal;
private static final SearchListener NO_OP_SEARCH_LISTENER = new NoOpSearchListener();

private final PerformanceAnalyzerController controller;
private final Histogram cpuUtilizationHistogram;
private final Histogram heapUsedHistogram;
private final int numProcessors;

public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) {
this.controller = controller;
this.cpuUtilizationHistogram = createCPUUtilizationHistogram();
heapUsedHistogram = createHeapUsedHistogram();
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
this.threadLocal = ThreadLocal.withInitial(() -> new HashMap<String, Long>());
this.numProcessors = Runtime.getRuntime().availableProcessors();
}

private Histogram createCPUUtilizationHistogram() {
MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
if (metricsRegistry != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the registry is null it does not make sense to create this listener, Do we need to throw exception instead of returning null?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We just want to stay silent.

return metricsRegistry.createHistogram(
RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(),
"CPU Utilization per shard for an operation",
RTFMetrics.MetricUnits.RATE.toString());
} else {
LOG.debug("MetricsRegistry is null");
return null;
}
}

private Histogram createHeapUsedHistogram() {
MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
RTFMetrics.HeapValue.HEAP_USED.toString(),
"Heap used per shard for an operation",
RTFMetrics.MetricUnits.BYTE.toString());
} else {
LOG.debug("MetricsRegistry is null");
return null;
}
}

@Override
public String toString() {
return RTFPerformanceAnalyzerSearchListener.class.getSimpleName();
}

@VisibleForTesting
SearchListener getSearchListener() {
return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER;
}

private boolean isSearchListenerEnabled() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isSearchListenerEnabled seems to be a mandatory method for any SearchListeners implementation. Should we add it to the super class and all the subclasses needs to write the logic of this method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that. Didn't want to change a lot the existing stuff as anyways the plan is to bring this into core at some point in time.

LOG.debug(
Gaganjuneja marked this conversation as resolved.
Show resolved Hide resolved
"Controller enable status {}, CollectorMode value {}",
controller.isPerformanceAnalyzerEnabled(),
controller.getCollectorsSettingValue());
return OpenSearchResources.INSTANCE.getMetricsRegistry() != null
&& controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsSettingValue()
== Util.CollectorMode.TELEMETRY.getValue());
}

@Override
public void onPreQueryPhase(SearchContext searchContext) {
try {
getSearchListener().preQueryPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we throwing only OPENSEARCH_REQUEST_INTERCEPTOR_ERROR from all the Phases?
Is this correct or do we need to throw different exceptions from different Phases.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to add a stat that there is some issue with the interceptor.

}
}

@Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
try {
getSearchListener().queryPhase(searchContext, tookInNanos);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFailedQueryPhase(SearchContext searchContext) {
try {
getSearchListener().failedQueryPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onPreFetchPhase(SearchContext searchContext) {
try {
getSearchListener().preFetchPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
try {
getSearchListener().fetchPhase(searchContext, tookInNanos);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFailedFetchPhase(SearchContext searchContext) {
try {
getSearchListener().failedFetchPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void preQueryPhase(SearchContext searchContext) {
threadLocal.get().put(QUERY_START_TIME, System.nanoTime());
}

@Override
public void queryPhase(SearchContext searchContext, long tookInNanos) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
addResourceTrackingCompletionListener(
searchContext, queryStartTime, OPERATION_SHARD_QUERY, false);
}

@Override
public void failedQueryPhase(SearchContext searchContext) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
addResourceTrackingCompletionListener(
searchContext, queryStartTime, OPERATION_SHARD_QUERY, true);
}

@Override
public void preFetchPhase(SearchContext searchContext) {
threadLocal.get().put(FETCH_START_TIME, System.nanoTime());
}

@Override
public void fetchPhase(SearchContext searchContext, long tookInNanos) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l);
addResourceTrackingCompletionListener(
searchContext, fetchStartTime, OPERATION_SHARD_FETCH, false);
}

@Override
public void failedFetchPhase(SearchContext searchContext) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l);
addResourceTrackingCompletionListener(
searchContext, fetchStartTime, OPERATION_SHARD_FETCH, true);
}

private void addResourceTrackingCompletionListener(
SearchContext searchContext, long startTime, String operation, boolean isFailed) {
searchContext
.getTask()
.addResourceTrackingCompletionListener(
createListener(
searchContext,
(System.nanoTime() - startTime),
operation,
isFailed));
}

@VisibleForTesting
NotifyOnceListener<Task> createListener(
SearchContext searchContext, long totalTime, String operation, boolean isFailed) {
return new NotifyOnceListener<Task>() {
@Override
protected void innerOnResponse(Task task) {
LOG.debug("Updating the counter for task {}", task.getId());
cpuUtilizationHistogram.record(
Utils.calculateCPUUtilization(
numProcessors,
totalTime,
task.getTotalResourceStats().getCpuTimeInNanos()),
createTags());
heapUsedHistogram.record(
Math.max(0, task.getTotalResourceStats().getMemoryInBytes()), createTags());
}

private Tags createTags() {
return Tags.create()
.addTag(
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
searchContext.request().shardId().getIndex().getName())
.addTag(
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
searchContext.request().shardId().getIndex().getUUID())
.addTag(
RTFMetrics.CommonDimension.SHARD_ID.toString(),
searchContext.request().shardId().getId())
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation)
.addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed);
}

@Override
protected void innerOnFailure(Exception e) {
LOG.error("Error is executing the the listener", e);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -45,7 +46,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro

@VisibleForTesting
TransportChannel getChannel(T request, TransportChannel channel, Task task) {
if (!controller.isPerformanceAnalyzerEnabled()) {
if (!isCollectorEnabled()) {
return channel;
}

Expand All @@ -56,6 +57,13 @@ TransportChannel getChannel(T request, TransportChannel channel, Task task) {
}
}

private boolean isCollectorEnabled() {
return controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsSettingValue()
== Util.CollectorMode.RCA.getValue());
}

private TransportChannel getShardBulkChannel(T request, TransportChannel channel, Task task) {
String className = request.getClass().getName();
boolean bPrimary = false;
Expand Down
Loading
Loading