From 896defecdd6675bf5d67347ee46e1ad7faa5a808 Mon Sep 17 00:00:00 2001 From: Andrew Theurer Date: Mon, 12 Aug 2024 10:01:24 -0400 Subject: [PATCH] Limit msearch -process in 16k char or less chunks -it's not obvious what the limit we are up against, but 16k seems to be well below whatever it is. --- queries/cdmq/cdm.js | 233 ++++++++++++++++++++++++-------------------- 1 file changed, 128 insertions(+), 105 deletions(-) diff --git a/queries/cdmq/cdm.js b/queries/cdmq/cdm.js index dc2ab440..5d8357d3 100644 --- a/queries/cdmq/cdm.js +++ b/queries/cdmq/cdm.js @@ -61,6 +61,52 @@ intersectAllArrays = function (a2D) { }; exports.intersectAllArrays = intersectAllArrays; + +function esJsonArrRequest(host, idx, jsonArr) { + var url = 'http://' + host + '/' + getIndexBaseName() + idx; + var max = 16384; + var idx = 0; + var req_count = 0; + var q_count = 0; + var ndjson = ""; + var responses = []; + // Process queries in chunks no larger than 'max' chars + while (idx < jsonArr.length) { + // Add two jsons/lines at a time, as the first json is the index and the second is the query + if ((ndjson.length + jsonArr[idx].length + jsonArr[idx+1].length) < max) { + q_count++; + ndjson += jsonArr[idx] + '\n' +jsonArr[idx+1] + '\n'; + idx += 2; + } else { + req_count++; + q_count = 0; + var resp = request('POST', url, + { + body: ndjson, + headers: { 'Content-Type': 'application/json' } + } + ); + var data = JSON.parse(resp.getBody()); + responses.push(...data.responses); + ndjson = ""; + } + } + if (ndjson != "") { + req_count++; + q_count = 0; + var resp = request('POST', url, + { + body: ndjson, + headers: { 'Content-Type': 'application/json' } + } + ); + var data = JSON.parse(resp.getBody()); + responses.push(...data.responses); + ndjson = ""; + } + return responses; +} + function esRequest(host, idx, q) { var url = 'http://' + host + '/' + getIndexBaseName() + idx; // The var q can be an object or a string. If you are submitting NDJSON @@ -85,7 +131,7 @@ function esRequest(host, idx, q) { mSearch = function (url, index, termKeys, values, source, aggs, size, sort) { if (typeof termKeys !== typeof []) return; if (typeof values !== typeof []) return; - var ndjson = ''; + var jsonArr = []; for (var i = 0; i < values[0].length; i++) { var req = { query: { bool: { filter: [] } } }; if (source !== '' && source !== null) { @@ -107,59 +153,59 @@ mSearch = function (url, index, termKeys, values, source, aggs, size, sort) { if (aggs !== null) { req['aggs'] = aggs; } - ndjson += '{}\n' + JSON.stringify(req) + '\n'; + jsonArr.push('{}'); + jsonArr.push(JSON.stringify(req)); } - var resp = esRequest(url, index + '/_msearch', ndjson); - var data = JSON.parse(resp.getBody()); + var responses = esJsonArrRequest(url, index + '/_msearch', jsonArr); // Unpack response and organize in array of arrays var retData = []; - for (var i = 0; i < data.responses.length; i++) { + for (var i = 0; i < responses.length; i++) { // For queries with aggregation if ( - typeof data.responses[i].aggregations !== 'undefined' && - Array.isArray(data.responses[i].aggregations.source.buckets) + typeof responses[i].aggregations !== 'undefined' && + Array.isArray(responses[i].aggregations.source.buckets) ) { - if (data.responses[i].aggregations.source.sum_other_doc_count > 0) { + if (responses[i].aggregations.source.sum_other_doc_count > 0) { console.log( 'WARNING! msearch aggregation returned sum_other_doc_count > 0, which means not all terms were returned. This query needs a larger "size"' ); } // Assemble the keys from the bucket for this query (i) var keys = []; - data.responses[i].aggregations.source.buckets.forEach((element) => { + responses[i].aggregations.source.buckets.forEach((element) => { keys.push(element.key); }); retData[i] = keys; // For queries without aggregation } else { - if (data.responses[i].hits == null) { + if (responses[i].hits == null) { console.log('WARNING! msearch returned data.responses[' + i + '].hits is NULL'); - console.log(JSON.stringify(data.responses[i], null, 2)); + console.log(JSON.stringify(responses[i], null, 2)); return; } - if (Array.isArray(data.responses[i].hits.hits) && data.responses[i].hits.hits.length > 0) { + if (Array.isArray(responses[i].hits.hits) && responses[i].hits.hits.length > 0) { if ( - data.responses[i].hits.total.value !== data.responses[i].hits.hits.length && - req.size != data.responses[i].hits.hits.length + responses[i].hits.total.value !== responses[i].hits.hits.length && + req.size != responses[i].hits.hits.length ) { console.log( 'WARNING! msearch(size: ' + size + - ') data.responses[' + + ') responses[' + i + '].hits.total.value (' + - data.responses[i].hits.total.value + - ') and data.responses[' + + responses[i].hits.total.value + + ') and responses[' + i + '].hits.hits.length (' + - data.responses[i].hits.hits.length + + responses[i].hits.hits.length + ') are not equal, which means the retured data is probably incomplete' ); } var ids = []; - data.responses[i].hits.hits.forEach((element) => { + responses[i].hits.hits.forEach((element) => { // A source of "x.y" must be converted to reference the object // For example, a source (string) of "metric_desc.id" needs to reference metric_desc[id] var obj = element._source; @@ -349,9 +395,7 @@ mgetPeriodRange = function (url, periodIds) { idx++; } } - //console.log("Ids:\n" + JSON.stringify(Ids, null, 2)); var data = mSearch(url, 'period', ['period.id'], [Ids], 'period', null, 1); - //console.log("data:\n" + JSON.stringify(data, null, 2)); // mSearch returns a 2D array, in other words, a list of values (inner array) for each query (outer array) // In this case, the queries are 1 per sampleId/periodName (for all iterations ordered), and the list of values // happens to be exactly 1 value, the primaryPeriodId. @@ -1021,8 +1065,8 @@ getIters = function ( const now = Date.now(); var intersectedRunIds = []; - var ndjson = ''; - var ndjson2 = ''; + var jsonArr = []; + var jsonArr2 = ''; var indexjson = ''; var qjson = ''; var newestDay = now - 1000 * 3600 * 24 * filterByAge.split('-')[0]; @@ -1053,20 +1097,19 @@ getIters = function ( if (val != 'tag-not-used') { var tagValTerm = { term: { 'tag.val': val } }; tag_query.query.bool.filter.push(tagValTerm); - ndjson += '{"index": "' + getIndexBaseName() + 'tag' + '" }\n'; - ndjson += JSON.stringify(tag_query) + '\n'; + jsonArr.push('{"index": "' + getIndexBaseName() + 'tag' + '" }'); + jsonArr.push(JSON.stringify(tag_query)); } else { // Find the run IDs which have this tag name present (value does not matter) - ndjson2 += '{"index": "' + getIndexBaseName() + 'tag' + '" }\n'; - ndjson2 += JSON.stringify(tag_query) + '\n'; + jsonArr2 += '{"index": "' + getIndexBaseName() + 'tag' + '" }\n'; + jsonArr2 += JSON.stringify(tag_query) + '\n'; } }); - if (ndjson != '') { - var resp = esRequest(url, 'tag/_msearch', ndjson); - var data = JSON.parse(resp.getBody()); + if (jsonArr.length > 0) { + var responses = esJsonArrRequest(url, 'tag/_msearch', jsonArr); var runIds = []; - data.responses.forEach((response) => { + responses.forEach((response) => { var theseRunIds = []; response.hits.hits.forEach((run) => { theseRunIds.push(run._source.run.id); @@ -1075,10 +1118,9 @@ getIters = function ( }); var intersectedRunIds = intersectAllArrays(runIds); - if (ndjson2 != '') { - var resp2 = esRequest(url, 'tag/_msearch', ndjson2); - var data2 = JSON.parse(resp2.getBody()); - data2.responses.forEach((response) => { + if (jsonArr2.length > 0) { + var responses2 = esJsonArrRequest(url, 'tag/_msearch', jsonArr2); + responses2.forEach((response) => { response.hits.hits.forEach((run) => { if (intersectedRunIds.includes(run._source.run.id)) { var index = intersectedRunIds.indexOf(run._source.run.id); @@ -1106,7 +1148,7 @@ getIters = function ( // The responses (a list of iteration.ids for each query) must be intersected // to have only the iteration.ids that match all param filters. console.log('Get all iterations from ' + filterByParams.length + ' param filters'); - ndjson = ''; + jsonArr = []; filterByParams.forEach((argval) => { var param_query = JSON.parse(base_q_json); var arg = argval.split(':')[0]; @@ -1117,22 +1159,22 @@ getIters = function ( if (val != 'param-not-used') { var paramVal = { term: { 'param.val': val } }; param_query.query.bool.filter.push(paramVal); - ndjson += '{"index": "' + getIndexBaseName() + 'param' + '" }\n'; - ndjson += JSON.stringify(param_query) + '\n'; + jsonArr.push('{"index": "' + getIndexBaseName() + 'param' + '" }'); + jsonArr.push(JSON.stringify(param_query)); } else { // Find the run IDs which have this param name present (value does not matter). // Later, we will subtract these iteration IDs from the ones found with ndjson query. - ndjson2 += '{"index": "' + getIndexBaseName() + 'param' + '" }\n'; - ndjson2 += JSON.stringify(param_query) + '\n'; + jsonArr2 += '{"index": "' + getIndexBaseName() + 'param' + '" }\n'; + jsonArr2 += JSON.stringify(param_query) + '\n'; } }); var iterIdsFromParam = []; - if (ndjson != '') { - var resp = esRequest(url, 'param/_msearch', ndjson); - var data = JSON.parse(resp.getBody()); + if (jsonArr.length > 0) { + var resp = esJsonArrRequest(url, 'param/_msearch', jsonArr); + var responses = JSON.parse(resp.getBody()); var iterationIds = []; - data.responses.forEach((response) => { + responses.forEach((response) => { var theseIterationIds = []; response.hits.hits.forEach((iteration) => { theseIterationIds.push(iteration._source.iteration.id); @@ -1141,10 +1183,10 @@ getIters = function ( }); iterIdsFromParam = intersectAllArrays(iterationIds); - if (ndjson2 != '') { - var resp2 = esRequest(url, 'tag/_msearch', ndjson2); - var data2 = JSON.parse(resp2.getBody()); - data2.responses.forEach((response) => { + if (jsonArr2 != '') { + var resp2 = esJsonArrRequest(url, 'tag/_msearch', jsonArr2); + var responses2 = JSON.parse(resp2.getBody()); + responses2.forEach((response) => { response.hits.hits.forEach((hit) => { if (iterIdsFromParam.includes(hit._source.iteration.id)) { var index = iterIdsFromParam.indexOf(hit._source.iteration.id); @@ -1564,7 +1606,7 @@ mgetMetricIdsFromTerms = function (url, termsSets) { // { 'period': x, 'run': y, 'termsByLabel': {} } // termsByLabel is a dict/hash of: // {