Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Searchbackpressure Service Reader and UTs added #495

Merged
merged 1 commit into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,13 @@ public enum ReaderMetrics implements MeasurementSet {
"FaultDetectionMetricsEmitterExecutionTime",
"millis",
StatsType.LATENCIES,
Statistics.SUM);
Statistics.SUM),
SEARCH_BACK_PRESSURE_METRICS_EMITTER_EXECUTION_TIME(
"SearchBackPressureMetricsEmitterExecutionTime",
"millis",
StatsType.LATENCIES,
Statistics.SUM),
;

/** What we want to appear as the metric name. */
private String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,136 @@ public static void emitGarbageCollectionInfo(
ReaderMetrics.GC_INFO_EMITTER_EXECUTION_TIME, mFinalT - mCurrT);
}

public static void emitSearchBackPressureMetrics(
MetricsDB metricsDB,
SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot) {
long mCurrT = System.currentTimeMillis();
Result<Record> searchbp_records = searchBackPressureMetricsSnapShot.fetchAll();

// String SEARCHBP_MODE_DIM = "searchbp_mode";
String SEARCHBP_TYPE_DIM =
AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TYPE_DIM.toString();
String SEARCHBP_TABLE_NAME =
AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TABLE_NAME.toString();

List<String> dims =
new ArrayList<String>() {
{
this.add(SEARCHBP_TYPE_DIM);
}
};

// stats type in sqlitedb is similar to:
// stats_type_name | sum | avg | min | max
List<String> stats_types =
new ArrayList<String>() {
{
// Shard/Task Stats Cancellation Count
// searchbp_shard_stats_cancellationCount|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT
.toString());
// searchbp_task_stats_cancellationCount|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_CANCELLATIONCOUNT
.toString());
// Shard Stats Resource Heap / CPU Usage
// searchbp_shard_stats_resource_heap_usage_cancellationCount|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT
.toString());
// searchbp_shard_stats_resource_heap_usage_currentMax|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX
.toString());
// searchbp_shard_stats_resource_heap_usage_rollingAvg|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG
.toString());
// searchbp_shard_stats_resource_cpu_usage_cancellationCount|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT
.toString());
// searchbp_shard_stats_resource_cpu_usage_currentMax|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX
.toString());
// searchbp_shard_stats_resource_cpu_usage_currentAvg|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG
.toString());
// Task Stats Resource Heap / CPU Usage
// searchbp_task_stats_resource_heap_usage_cancellationCount|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT
.toString());
// searchbp_task_stats_resource_heap_usage_currentMax|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX
.toString());
// searchbp_task_stats_resource_heap_usage_rollingAvg|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG
.toString());
// searchbp_task_stats_resource_cpu_usage_cancellationCount|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT
.toString());
// searchbp_task_stats_resource_cpu_usage_currentMax|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX
.toString());
// searchbp_task_stats_resource_cpu_usage_currentAvg|0.0|0.0|0.0|0.0
this.add(
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG
.toString());
}
};

metricsDB.createMetric(new Metric<>(SEARCHBP_TABLE_NAME, 0d), dims);

BatchBindStep handle = metricsDB.startBatchPut(new Metric<>(SEARCHBP_TABLE_NAME, 0d), dims);

for (Record record : searchbp_records) {
for (String stats_type : stats_types) {
Optional<Object> tmpStatsObj = Optional.ofNullable(record.get(stats_type));

handle.bind(
stats_type,
// the rest are agg fields: sum, avg, min, max which don't make sense for
// searchbackpressure
tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L),
tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L),
tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L),
tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L));
}
}

handle.execute();

long mFinalT = System.currentTimeMillis();
LOG.debug(
"Total time taken for writing Search Back Pressure info into metricsDB: {}",
mFinalT - mCurrT);
ServiceMetrics.READER_METRICS_AGGREGATOR.updateStat(
ReaderMetrics.SEARCH_BACK_PRESSURE_METRICS_EMITTER_EXECUTION_TIME,
mFinalT - mCurrT);
}

public static void emitAdmissionControlMetrics(
MetricsDB metricsDB, AdmissionControlSnapshot snapshot) {
long mCurrT = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class ReaderMetricsProcessor implements Runnable {
clusterManagerThrottlingMetricsMap;
private NavigableMap<Long, ShardStateMetricsSnapshot> shardStateMetricsMap;
private NavigableMap<Long, AdmissionControlSnapshot> admissionControlMetricsMap;
private NavigableMap<Long, SearchBackPressureMetricsSnapShot> searchBackPressureMetricsMap;

private static final int MAX_DATABASES = 2;
private static final int OS_SNAPSHOTS = 4;
Expand All @@ -81,6 +82,7 @@ public class ReaderMetricsProcessor implements Runnable {
private static final int GC_INFO_SNAPSHOTS = 4;
private static final int CLUSTER_MANAGER_THROTTLING_SNAPSHOTS = 2;
private static final int AC_SNAPSHOTS = 2;
private static final int SEARCH_BP_SNAPSHOTS = 4;
private final String rootLocation;

private final AppContext appContext;
Expand Down Expand Up @@ -125,6 +127,8 @@ public ReaderMetricsProcessor(
gcInfoMap = new TreeMap<>();
clusterManagerThrottlingMetricsMap = new TreeMap<>();
admissionControlMetricsMap = new TreeMap<>();
searchBackPressureMetricsMap = new TreeMap<>();

this.rootLocation = rootLocation;
this.configOverridesApplier = new ConfigOverridesApplier();

Expand Down Expand Up @@ -268,6 +272,7 @@ public void trimOldSnapshots() throws Exception {
trimMap(gcInfoMap, GC_INFO_SNAPSHOTS);
trimMap(clusterManagerThrottlingMetricsMap, CLUSTER_MANAGER_THROTTLING_SNAPSHOTS);
trimMap(admissionControlMetricsMap, AC_SNAPSHOTS);
trimMap(searchBackPressureMetricsMap, SEARCH_BP_SNAPSHOTS);

for (NavigableMap<Long, MemoryDBSnapshot> snap : nodeMetricsMap.values()) {
// do the same thing as OS_SNAPSHOTS. Eventually MemoryDBSnapshot
Expand Down Expand Up @@ -397,6 +402,7 @@ private void emitMetrics(long currWindowStartTime) throws Exception {
emitAdmissionControlMetrics(prevWindowStartTime, metricsDB);
emitClusterManagerMetrics(prevWindowStartTime, metricsDB);
emitClusterManagerThrottlingMetrics(prevWindowStartTime, metricsDB);
emitSearchBackPressureMetrics(prevWindowStartTime, metricsDB);

metricsDB.commit();
metricsDBMap.put(prevWindowStartTime, metricsDB);
Expand Down Expand Up @@ -594,6 +600,19 @@ private void emitClusterManagerThrottlingMetrics(
}
}

private void emitSearchBackPressureMetrics(long prevWindowStartTime, MetricsDB metricsDB)
throws Exception {
if (searchBackPressureMetricsMap.containsKey(prevWindowStartTime)) {
SearchBackPressureMetricsSnapShot prevSearchBPSnapShot =
searchBackPressureMetricsMap.get(prevWindowStartTime);
MetricsEmitter.emitSearchBackPressureMetrics(metricsDB, prevSearchBPSnapShot);
} else {
LOG.debug(
"Search Back Pressure snapshot does not exist for the previous window. "
+ "Not emitting metrics.");
}
}

/**
* OS, Request, Http and cluster_manager first aligns the currentTimeStamp with a 5 second
* interval. In the current format, a file (previously a directory) is written every 5 seconds.
Expand Down Expand Up @@ -679,6 +698,9 @@ is ready so it starts to read that file (go back two windows and
EventProcessor admissionControlProcessor =
AdmissionControlProcessor.build(
currWindowStartTime, conn, admissionControlMetricsMap);
EventProcessor searchBackPressureMetricsProcessor =
SearchBackPressureMetricsProcessor.buildSearchBackPressureMetricsProcessor(
currWindowStartTime, conn, searchBackPressureMetricsMap);

// The event dispatcher dispatches events to each of the registered event processors.
// In addition to event processing each processor has an initialize/finalize function that
Expand All @@ -702,6 +724,7 @@ is ready so it starts to read that file (go back two windows and
eventDispatcher.registerEventProcessor(faultDetectionProcessor);
eventDispatcher.registerEventProcessor(garbageCollectorInfoProcessor);
eventDispatcher.registerEventProcessor(admissionControlProcessor);
eventDispatcher.registerEventProcessor(searchBackPressureMetricsProcessor);

eventDispatcher.initializeProcessing(
currWindowStartTime, currWindowStartTime + MetricsConfiguration.SAMPLING_INTERVAL);
Expand Down
Loading
Loading