Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Binlong <[email protected]>
  • Loading branch information
gaobinlong committed Sep 19, 2024
2 parents d8a0a84 + ab7816c commit fa76419
Show file tree
Hide file tree
Showing 19 changed files with 458 additions and 224 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix case-insensitive query on wildcard field ([#15882](https://github.com/opensearch-project/OpenSearch/pull/15882))
- Add validation for the search backpressure cancellation settings ([#15501](https://github.com/opensearch-project/OpenSearch/pull/15501))
- Fix search_as_you_type not supporting multi-fields ([#15988](https://github.com/opensearch-project/OpenSearch/pull/15988))
- Fix infinite loop in nested agg ([#15931](https://github.com/opensearch-project/OpenSearch/pull/15931))

### Security

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
---
# The test setup includes:
# - Create nested mapping for test_nested_agg_index index
# - Index two example documents
# - nested agg

setup:
- do:
indices.create:
index: test_nested_agg_index
body:
mappings:
properties:
a:
type: nested
properties:
b1:
type: keyword
b2:
type: nested
properties:
c:
type: nested
properties:
d:
type: keyword

- do:
bulk:
refresh: true
body: |
{"index": {"_index": "test_nested_agg_index", "_id": "0"}}
{"a": { "b1": "b11", "b2": { "c": { "d": "d1" } }}}
{"index": {"_index": "test_nested_agg_index", "_id": "1"}}
{"a": { "b1": "b12", "b2": { "c": { "d": "d2" } }}}
---
# Delete Index when connection is teardown
teardown:
- do:
indices.delete:
index: test_nested_agg_index

---
"Supported queries":
- skip:
version: " - 2.17.99"
reason: "fixed in 2.18.0"

# Verify Document Count
- do:
search:
body: {
query: {
match_all: { }
}
}

- length: { hits.hits: 2 }

# Verify nested aggregation
- do:
search:
body: {
aggs: {
nested_agg: {
nested: {
path: "a"
},
aggs: {
a_b1: {
terms: {
field: "a.b1"
},
aggs: {
"c": {
nested: {
path: "a.b2.c"
},
aggs: {
"d": {
terms: {
field: "a.b2.c.d"
}
}
}
}
}
}
}
}
}
}

- length: { hits.hits: 2 }
- match: { aggregations.nested_agg.doc_count: 2 }
- length: { aggregations.nested_agg.a_b1.buckets: 2 }

- match: { aggregations.nested_agg.a_b1.buckets.0.key: "b11" }
- match: { aggregations.nested_agg.a_b1.buckets.0.doc_count: 1 }
- match: { aggregations.nested_agg.a_b1.buckets.0.c.doc_count: 1 }
- length: { aggregations.nested_agg.a_b1.buckets.0.c.d.buckets: "1" }
- match: { aggregations.nested_agg.a_b1.buckets.0.c.d.buckets.0.key: "d1" }
- match: { aggregations.nested_agg.a_b1.buckets.0.c.d.buckets.0.doc_count: 1 }

- match: { aggregations.nested_agg.a_b1.buckets.1.key: "b12" }
- match: { aggregations.nested_agg.a_b1.buckets.1.doc_count: 1 }
- match: { aggregations.nested_agg.a_b1.buckets.1.c.doc_count: 1 }
- length: { aggregations.nested_agg.a_b1.buckets.1.c.d.buckets: "1" }
- match: { aggregations.nested_agg.a_b1.buckets.1.c.d.buckets.0.key: "d2" }
- match: { aggregations.nested_agg.a_b1.buckets.1.c.d.buckets.0.doc_count: 1 }
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
setup:
- skip:
version: " - 2.99.99"
version: " - 2.16.99"
reason: The bitmap filtering feature is available in 2.17 and later.
- do:
indices.create:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class NodesReloadSecureSettingsRequest extends BaseNodesRequest<NodesRelo
private SecureString secureSettingsPassword;

public NodesReloadSecureSettingsRequest() {
super((String[]) null);
super(true, (String[]) null);
}

public NodesReloadSecureSettingsRequest(StreamInput in) throws IOException {
Expand All @@ -84,7 +84,7 @@ public NodesReloadSecureSettingsRequest(StreamInput in) throws IOException {
* nodes.
*/
public NodesReloadSecureSettingsRequest(String... nodesIds) {
super(nodesIds);
super(true, nodesIds);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.index.compositeindex.datacube.startree.node.InMemoryTreeNode;
import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNodeType;
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.index.mapper.FieldMapper;
import org.opensearch.index.mapper.FieldValueConverter;
Expand Down Expand Up @@ -193,7 +194,9 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat
metricFieldInfo = getFieldInfo(metric.getField(), DocValuesType.SORTED_NUMERIC);
}
metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
new SortedNumericStarTreeValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
)
);
}
metricReaders.add(metricReader);
Expand Down Expand Up @@ -228,7 +231,7 @@ public void build(
dimensionFieldInfo = getFieldInfo(dimension, DocValuesType.SORTED_NUMERIC);
}
dimensionReaders[i] = new SequentialDocValuesIterator(
fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo)
new SortedNumericStarTreeValuesIterator(fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo))
);
}
Iterator<StarTreeDocument> starTreeDocumentIterator = sortAndAggregateSegmentDocuments(dimensionReaders, metricReaders);
Expand Down Expand Up @@ -287,7 +290,7 @@ void appendDocumentsToStarTree(Iterator<StarTreeDocument> starTreeDocumentIterat
}
}

private void serializeStarTree(int numSegmentStarTreeDocument, int numStarTreeDocs) throws IOException {
private void serializeStarTree(int numSegmentStarTreeDocuments, int numStarTreeDocs) throws IOException {
// serialize the star tree data
long dataFilePointer = dataOut.getFilePointer();
StarTreeWriter starTreeWriter = new StarTreeWriter();
Expand All @@ -299,7 +302,7 @@ private void serializeStarTree(int numSegmentStarTreeDocument, int numStarTreeDo
starTreeField,
metricAggregatorInfos,
numStarTreeNodes,
numSegmentStarTreeDocument,
numSegmentStarTreeDocuments,
numStarTreeDocs,
dataFilePointer,
totalStarTreeDataLength
Expand Down Expand Up @@ -400,22 +403,20 @@ protected StarTreeDocument getStarTreeDocument(
) throws IOException {
Long[] dims = new Long[numDimensions];
int i = 0;
for (SequentialDocValuesIterator dimensionDocValueIterator : dimensionReaders) {
dimensionDocValueIterator.nextDoc(currentDocId);
Long val = dimensionDocValueIterator.value(currentDocId);
for (SequentialDocValuesIterator dimensionValueIterator : dimensionReaders) {
dimensionValueIterator.nextEntry(currentDocId);
Long val = dimensionValueIterator.value(currentDocId);
dims[i] = val;
i++;
}
i = 0;
Object[] metrics = new Object[metricReaders.size()];
for (SequentialDocValuesIterator metricDocValuesIterator : metricReaders) {
metricDocValuesIterator.nextDoc(currentDocId);
for (SequentialDocValuesIterator metricValuesIterator : metricReaders) {
metricValuesIterator.nextEntry(currentDocId);
// As part of merge, we traverse the star tree doc values
// The type of data stored in metric fields is different from the
// actual indexing field they're based on
metrics[i] = metricAggregatorInfos.get(i)
.getValueAggregators()
.toAggregatedValueType(metricDocValuesIterator.value(currentDocId));
metrics[i] = metricAggregatorInfos.get(i).getValueAggregators().toAggregatedValueType(metricValuesIterator.value(currentDocId));
i++;
}
return new StarTreeDocument(dims, metrics);
Expand Down Expand Up @@ -502,7 +503,7 @@ Long[] getStarTreeDimensionsFromSegment(int currentDocId, SequentialDocValuesIte
for (int i = 0; i < numDimensions; i++) {
if (dimensionReaders[i] != null) {
try {
dimensionReaders[i].nextDoc(currentDocId);
dimensionReaders[i].nextEntry(currentDocId);
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
Expand Down Expand Up @@ -530,7 +531,7 @@ private Object[] getStarTreeMetricsFromSegment(int currentDocId, List<Sequential
SequentialDocValuesIterator metricStatReader = metricsReaders.get(i);
if (metricStatReader != null) {
try {
metricStatReader.nextDoc(currentDocId);
metricStatReader.nextEntry(currentDocId);
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
Expand Down Expand Up @@ -672,7 +673,7 @@ private SequentialDocValuesIterator getIteratorForNumericField(
SequentialDocValuesIterator sequentialDocValuesIterator;
assert fieldProducerMap.containsKey(fieldInfo.name);
sequentialDocValuesIterator = new SequentialDocValuesIterator(
DocValues.singleton(fieldProducerMap.get(fieldInfo.name).getNumeric(fieldInfo))
new SortedNumericStarTreeValuesIterator(DocValues.singleton(fieldProducerMap.get(fieldInfo.name).getNumeric(fieldInfo)))
);
return sequentialDocValuesIterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
.size()];
for (int i = 0; i < dimensionsSplitOrder.size(); i++) {
String dimension = dimensionsSplitOrder.get(i).getField();
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionDocIdSetIterator(dimension));
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionValuesIterator(dimension));
}
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
// get doc id set iterators for metrics
Expand All @@ -164,7 +164,7 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
metric.getField(),
metricStat.getTypeName()
);
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricDocIdSetIterator(metricFullName)));
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricValuesIterator(metricFullName)));
}
}
int currentDocId = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ StarTreeDocument[] getSegmentsStarTreeDocuments(List<StarTreeValues> starTreeVal

for (int i = 0; i < dimensionsSplitOrder.size(); i++) {
String dimension = dimensionsSplitOrder.get(i).getField();
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionDocIdSetIterator(dimension));
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionValuesIterator(dimension));
}

List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
Expand All @@ -150,7 +150,7 @@ StarTreeDocument[] getSegmentsStarTreeDocuments(List<StarTreeValues> starTreeVal
metric.getField(),
metricStat.getTypeName()
);
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricDocIdSetIterator(metricFullName)));
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricValuesIterator(metricFullName)));

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -55,11 +54,9 @@ public class StarTreeDocsFileManager extends AbstractDocumentsFileManager implem
private RandomAccessInput starTreeDocsFileRandomInput;
private IndexOutput starTreeDocsFileOutput;
private final Map<String, Integer> fileToEndDocIdMap;
private final List<Integer> starTreeDocumentOffsets = new ArrayList<>();
private int currentFileStartDocId;
private int numReadableStarTreeDocuments;
private int starTreeFileCount = -1;
private int currBytes = 0;
private final int fileCountMergeThreshold;
private int numStarTreeDocs = 0;

Expand Down Expand Up @@ -98,15 +95,26 @@ IndexOutput createStarTreeDocumentsFileOutput() throws IOException {
public void writeStarTreeDocument(StarTreeDocument starTreeDocument, boolean isAggregatedDoc) throws IOException {
assert isAggregatedDoc == true;
int numBytes = writeStarTreeDocument(starTreeDocument, starTreeDocsFileOutput, true);
addStarTreeDocumentOffset(numBytes);
if (docSizeInBytes == -1) {
docSizeInBytes = numBytes;
} else {
assert docSizeInBytes == numBytes;
}
numStarTreeDocs++;
}

@Override
public StarTreeDocument readStarTreeDocument(int docId, boolean isAggregatedDoc) throws IOException {
assert isAggregatedDoc == true;
ensureDocumentReadable(docId);
return readStarTreeDocument(starTreeDocsFileRandomInput, starTreeDocumentOffsets.get(docId), true);
return readStarTreeDocument(starTreeDocsFileRandomInput, getOffset(docId), true);
}

/**
* Returns offset for the docId based on the current file start id
*/
private long getOffset(int docId) {
return (long) (docId - currentFileStartDocId) * docSizeInBytes;
}

@Override
Expand All @@ -119,19 +127,10 @@ public Long getDimensionValue(int docId, int dimensionId) throws IOException {
public Long[] readDimensions(int docId) throws IOException {
ensureDocumentReadable(docId);
Long[] dims = new Long[starTreeField.getDimensionsOrder().size()];
readDimensions(dims, starTreeDocsFileRandomInput, starTreeDocumentOffsets.get(docId));
readDimensions(dims, starTreeDocsFileRandomInput, getOffset(docId));
return dims;
}

private void addStarTreeDocumentOffset(int bytes) {
starTreeDocumentOffsets.add(currBytes);
currBytes += bytes;
if (docSizeInBytes == -1) {
docSizeInBytes = bytes;
}
assert docSizeInBytes == bytes;
}

/**
* Load the correct StarTreeDocuments file based on the docId
*/
Expand Down Expand Up @@ -199,7 +198,6 @@ private void loadStarTreeDocumentFile(int docId) throws IOException {
* If the operation is only for reading existing documents, a new file is not created.
*/
private void closeAndMaybeCreateNewFile(boolean shouldCreateFileForAppend, int numStarTreeDocs) throws IOException {
currBytes = 0;
if (starTreeDocsFileOutput != null) {
fileToEndDocIdMap.put(starTreeDocsFileOutput.getName(), numStarTreeDocs);
IOUtils.close(starTreeDocsFileOutput);
Expand Down Expand Up @@ -232,7 +230,6 @@ private void mergeFiles(int numStarTreeDocs) throws IOException {
deleteOldFiles();
fileToEndDocIdMap.clear();
fileToEndDocIdMap.put(mergedOutput.getName(), numStarTreeDocs);
resetStarTreeDocumentOffsets();
}
}

Expand All @@ -259,17 +256,6 @@ private void deleteOldFiles() throws IOException {
}
}

/**
* Reset the star tree document offsets based on the merged file
*/
private void resetStarTreeDocumentOffsets() {
int curr = 0;
for (int i = 0; i < starTreeDocumentOffsets.size(); i++) {
starTreeDocumentOffsets.set(i, curr);
curr += docSizeInBytes;
}
}

@Override
public void close() {
try {
Expand All @@ -288,7 +274,6 @@ public void close() {
tmpDirectory.deleteFile(file);
} catch (IOException ignored) {} // similar to IOUtils.deleteFilesIgnoringExceptions
}
starTreeDocumentOffsets.clear();
fileToEndDocIdMap.clear();
}
}
Loading

0 comments on commit fa76419

Please sign in to comment.