Skip to content

Commit

Permalink
Merge pull request #117 from perftool-incubator/get-metric-chunked
Browse files Browse the repository at this point in the history
Process N ndjson chunks at a time
  • Loading branch information
atheurer authored Aug 7, 2024
2 parents f68412f + fbed299 commit d23b68c
Showing 1 changed file with 39 additions and 22 deletions.
61 changes: 39 additions & 22 deletions queries/cdmq/cdm.js
Original file line number Diff line number Diff line change
Expand Up @@ -1752,7 +1752,9 @@ exports.getMetricGroupsFromBreakout = getMetricGroupsFromBreakout;
// then there are enough metric_data documents to compute the results.
getMetricDataFromIdsSets = function (url, sets, metricGroupIdsByLabelSets) {
//console.log("metricGroupIdsByLabelSets:\n" + JSON.stringufy(metricGroupIdsByLabelSets, null, 2));
var ndjson = '';
var maxndjson = 8 * 48; // Must be a multiple of 8
var ndjson = [];
var ndjsonLineNum = 0;
for (var idx = 0; idx < metricGroupIdsByLabelSets.length; idx++) {
Object.keys(metricGroupIdsByLabelSets[idx])
.sort()
Expand All @@ -1779,6 +1781,10 @@ getMetricDataFromIdsSets = function (url, sets, metricGroupIdsByLabelSets) {
// The resolution determines how many times we compute a value, each value for a
// different "slice" in the original begin-to-end time domain.
while (true) {
ndIdx = Math.floor(ndjsonLineNum / maxndjson);
if (typeof ndjson[ndIdx] == 'undefined') {
ndjson[ndIdx] = '';
}
// Calculating a single value representing an average for thisBegin - thisEnd
// relies on an [weighted average] aggregation, plus a few other queries. An
// alternative method would involve querying all documents for the orignal
Expand Down Expand Up @@ -1815,8 +1821,10 @@ getMetricDataFromIdsSets = function (url, sets, metricGroupIdsByLabelSets) {
reqjson += '}';
var index = JSON.parse(indexjson);
var req = JSON.parse(reqjson);
ndjson += JSON.stringify(index) + '\n';
ndjson += JSON.stringify(req) + '\n';
ndjson[ndIdx] += JSON.stringify(index) + '\n';
ndjsonLineNum++;
ndjson[ndIdx] += JSON.stringify(req) + '\n';
ndjsonLineNum++;
// This second request is for the total weight of the previous weighted average request.
// We need this because we are going to recompute the weighted average by adding
// a few more documents that are partially outside the time domain.
Expand All @@ -1840,8 +1848,10 @@ getMetricDataFromIdsSets = function (url, sets, metricGroupIdsByLabelSets) {
reqjson += '}\n';
index = JSON.parse(indexjson);
req = JSON.parse(reqjson);
ndjson += JSON.stringify(index) + '\n';
ndjson += JSON.stringify(req) + '\n';
ndjson[ndIdx] += JSON.stringify(index) + '\n';
ndjsonLineNum++;
ndjson[ndIdx] += JSON.stringify(req) + '\n';
ndjsonLineNum++;
// This third request is for documents that had its begin during or before the time range, but
// its end was after the time range.
indexjson = '{"index": "' + getIndexBaseName() + 'metric_data' + '" }\n';
Expand All @@ -1859,8 +1869,10 @@ getMetricDataFromIdsSets = function (url, sets, metricGroupIdsByLabelSets) {
reqjson += '}';
index = JSON.parse(indexjson);
req = JSON.parse(reqjson);
ndjson += JSON.stringify(index) + '\n';
ndjson += JSON.stringify(req) + '\n';
ndjson[ndIdx] += JSON.stringify(index) + '\n';
ndjsonLineNum++;
ndjson[ndIdx] += JSON.stringify(req) + '\n';
ndjsonLineNum++;
// This fourth request is for documents that had its begin before the time range, but
// its end was during or after the time range
var indexjson = '{"index": "' + getIndexBaseName() + 'metric_data' + '" }\n';
Expand All @@ -1879,8 +1891,10 @@ getMetricDataFromIdsSets = function (url, sets, metricGroupIdsByLabelSets) {
reqjson += '}\n';
index = JSON.parse(indexjson);
req = JSON.parse(reqjson);
ndjson += JSON.stringify(index) + '\n'; //ensures JSON is exactly 1 line
ndjson += JSON.stringify(req) + '\n'; //ensures JSON is exactly 1 line
ndjson[ndIdx] += JSON.stringify(index) + '\n'; //ensures JSON is exactly 1 line
ndjsonLineNum++;
ndjson[ndIdx] += JSON.stringify(req) + '\n'; //ensures JSON is exactly 1 line
ndjsonLineNum++;

// Cycle through every "slice" of the time domain, adding the requests for the entire time domain
thisBegin = thisEnd + 1;
Expand All @@ -1895,10 +1909,13 @@ getMetricDataFromIdsSets = function (url, sets, metricGroupIdsByLabelSets) {
});
}

var resp = esRequest(url, 'metric_data/_msearch', ndjson);
var data = JSON.parse(resp.getBody());
var elements = data.responses.length;

var responses = [];
for (idx = 0; idx < ndjson.length; idx++) {
var resp = esRequest(url, 'metric_data/_msearch', ndjson[idx]);
var data = JSON.parse(resp.getBody());
responses.push(...data['responses']);
}
var elements = responses.length;
var valueSets = [];
var count = 0;
for (var idx = 0; idx < metricGroupIdsByLabelSets.length; idx++) {
Expand All @@ -1918,7 +1935,7 @@ getMetricDataFromIdsSets = function (url, sets, metricGroupIdsByLabelSets) {
var thisBegin = begin;
var thisEnd = begin + duration;
var subCount = 0;
//var elements = data.responses.length / metricGroupIdsByLabelSets.length;
//var elements = responses.length / metricGroupIdsByLabelSets.length;
var numMetricIds = metricIds.length;
while (true) {
var timeWindowDuration = thisEnd - thisBegin + 1;
Expand All @@ -1928,14 +1945,14 @@ getMetricDataFromIdsSets = function (url, sets, metricGroupIdsByLabelSets) {
var aggWeight;
var aggAvgTimesWeight;
var newWeight;
aggAvg = data.responses[count].aggregations.metric_avg.value; //$$resp_ref{'responses'}[$count]{'aggregations'}{'metric_avg'}{'value'};
aggAvg = responses[count].aggregations.metric_avg.value; //$$resp_ref{'responses'}[$count]{'aggregations'}{'metric_avg'}{'value'};
if (typeof aggAvg != 'undefined') {
// We have the weighted average for documents that don't overlap the time range,
// but we need to combine that with the documents that are partially outside
// the time range. We need to know the total weight from the documents we
// just finished in order to add the new documents and recompute the new weighted
// average.
aggWeight = data.responses[count + 1].aggregations.total_weight.value;
aggWeight = responses[count + 1].aggregations.total_weight.value;
aggAvgTimesWeight = aggAvg * aggWeight;
} else {
// It is possible that the aggregation returned no results because all of the documents
Expand Down Expand Up @@ -1968,20 +1985,20 @@ getMetricDataFromIdsSets = function (url, sets, metricGroupIdsByLabelSets) {
var k;
for (k = 2; k < 4; k++) {
//for my $j (@{ $$resp_ref{'responses'}[$count + $k]{'hits'}{'hits'} }) {
if (data.responses[count + k].hits.total.value !== data.responses[count + k].hits.hits.length) {
if (responses[count + k].hits.total.value !== responses[count + k].hits.hits.length) {
console.log(
'WARNING! getMetricDataFromIdsSets() data.responses[' +
'WARNING! getMetricDataFromIdsSets() responses[' +
(count + k) +
'].hits.total.value (' +
data.responses[count + k].hits.total.value +
') and data.responses[' +
responses[count + k].hits.total.value +
') and responses[' +
(count + k) +
'].hits.hits.length (' +
data.responses[count + k].hits.hits.length +
responses[count + k].hits.hits.length +
') are not equal, which means the retured data is probably incomplete'
);
}
data.responses[count + k].hits.hits.forEach((element) => {
responses[count + k].hits.hits.forEach((element) => {
//for my $key (keys %{ $$j{'_source'}{'metric_data'} }) {
partialDocs[element._id] = {};
Object.keys(element._source.metric_data).forEach((key) => {
Expand Down

0 comments on commit d23b68c

Please sign in to comment.