Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Integrates KNN plugin with ConcurrentSearchRequestDecider interface (… #2126

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
### Features
### Enhancements
* Adds concurrent segment search support for mode auto [#2111](https://github.com/opensearch-project/k-NN/pull/2111)
### Bug Fixes
* Add DocValuesProducers for releasing memory when close index [#1946](https://github.com/opensearch-project/k-NN/pull/1946)
### Infrastructure
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/opensearch/knn/plugin/KNNPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.knn.index.KNNCircuitBreaker;
import org.opensearch.knn.plugin.search.KNNConcurrentSearchRequestDecider;
import org.opensearch.knn.index.util.KNNClusterUtil;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.knn.index.KNNSettings;
Expand Down Expand Up @@ -96,6 +97,7 @@
import org.opensearch.script.ScriptContext;
import org.opensearch.script.ScriptEngine;
import org.opensearch.script.ScriptService;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -373,4 +375,9 @@ public Settings additionalSettings() {
).collect(Collectors.toList());
return Settings.builder().putList(IndexModule.INDEX_STORE_HYBRID_MMAP_EXTENSIONS.getKey(), combinedSettings).build();
}

@Override
public Optional<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactory() {
return Optional.of(new KNNConcurrentSearchRequestDecider.Factory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.search;

import lombok.EqualsAndHashCode;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.search.deciders.ConcurrentSearchDecision;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;

import java.util.Optional;

/**
* Decides if the knn query uses concurrent segment search
* As of 2.17, this is only used when
* - "index.search.concurrent_segment_search.mode": "auto" or
* - "search.concurrent_segment_search.mode": "auto"
*
* Note: the class is not thread-safe and a new instance needs to be created for each request
*/
@EqualsAndHashCode(callSuper = true)
public class KNNConcurrentSearchRequestDecider extends ConcurrentSearchRequestDecider {

private static final ConcurrentSearchDecision DEFAULT_KNN_DECISION = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.NO_OP,
"Default decision"
);
private static final ConcurrentSearchDecision YES = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.YES,
"Enable concurrent search for knn as Query has k-NN query in it and index is k-nn index"
);

private ConcurrentSearchDecision knnDecision = DEFAULT_KNN_DECISION;

@Override
public void evaluateForQuery(final QueryBuilder queryBuilder, final IndexSettings indexSettings) {
if (queryBuilder instanceof KNNQueryBuilder && indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) {
knnDecision = YES;
} else {
knnDecision = DEFAULT_KNN_DECISION;
}
}

@Override
public ConcurrentSearchDecision getConcurrentSearchDecision() {
return knnDecision;
}

/**
* Returns {@link KNNConcurrentSearchRequestDecider} when index.knn is true
*/
public static class Factory implements ConcurrentSearchRequestDecider.Factory {
public Optional<ConcurrentSearchRequestDecider> create(final IndexSettings indexSettings) {
if (indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) {
return Optional.of(new KNNConcurrentSearchRequestDecider());
}
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.integ.search;

import com.google.common.primitives.Floats;
import lombok.SneakyThrows;
import org.apache.http.util.EntityUtils;
import org.junit.BeforeClass;
import org.opensearch.client.Response;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.knn.KNNJsonIndexMappingsBuilder;
import org.opensearch.knn.KNNRestTestCase;
import org.opensearch.knn.KNNResult;
import org.opensearch.knn.TestUtils;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.knn.plugin.script.KNNScoringUtil;

import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import static org.opensearch.knn.common.KNNConstants.METHOD_HNSW;

/**
* Note that this is simply a sanity test to make sure that concurrent search code path is hit E2E and scores are intact
* There is no latency verification as it can be better encapsulated in nightly runs.
*/
public class ConcurrentSegmentSearchIT extends KNNRestTestCase {

static TestUtils.TestData testData;

@BeforeClass
public static void setUpClass() throws IOException {
if (ConcurrentSegmentSearchIT.class.getClassLoader() == null) {
throw new IllegalStateException("ClassLoader of ConcurrentSegmentSearchIT Class is null");
}
URL testIndexVectors = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_vectors_1000x128.json");
URL testQueries = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_queries_100x128.csv");
assert testIndexVectors != null;
assert testQueries != null;
testData = new TestUtils.TestData(testIndexVectors.getPath(), testQueries.getPath());
}

@SneakyThrows
public void testConcurrentSegmentSearch_thenSucceed() {
String indexName = "test-concurrent-segment";
String fieldName = "test-field-1";
int dimension = testData.indexData.vectors[0].length;
final XContentBuilder indexBuilder = createFaissHnswIndexMapping(fieldName, dimension);
Map<String, Object> mappingMap = xContentBuilderToMap(indexBuilder);
String mapping = indexBuilder.toString();
createKnnIndex(indexName, mapping);
assertEquals(new TreeMap<>(mappingMap), new TreeMap<>(getIndexMappingAsMap(indexName)));

// Index the test data
for (int i = 0; i < testData.indexData.docs.length; i++) {
addKnnDoc(
indexName,
Integer.toString(testData.indexData.docs[i]),
fieldName,
Floats.asList(testData.indexData.vectors[i]).toArray()
);
}
refreshAllNonSystemIndices();
updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "auto"));

// Test search queries
int k = 10;
verifySearch(indexName, fieldName, k);

updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "all"));
verifySearch(indexName, fieldName, k);

deleteKNNIndex(indexName);
}

/*
{
"properties": {
"<fieldName>": {
"type": "knn_vector",
"dimension": <dimension>,
"method": {
"name": "hnsw",
"space_type": "l2",
"engine": "faiss",
"parameters": {
"m": 16,
"ef_construction": 128,
"ef_search": 128
}
}
}
}
*/
@SneakyThrows
private XContentBuilder createFaissHnswIndexMapping(String fieldName, int dimension) {
return KNNJsonIndexMappingsBuilder.builder()
.fieldName(fieldName)
.dimension(dimension)
.method(
KNNJsonIndexMappingsBuilder.Method.builder()
.engine(KNNEngine.FAISS.getName())
.methodName(METHOD_HNSW)
.spaceType(SpaceType.L2.getValue())
.parameters(KNNJsonIndexMappingsBuilder.Method.Parameters.builder().efConstruction(128).efSearch(128).m(16).build())
.build()
)
.build()
.getIndexMappingBuilder();
}

@SneakyThrows
private void verifySearch(String indexName, String fieldName, int k) {
for (int i = 0; i < testData.queries.length; i++) {
final KNNQueryBuilder queryBuilder = KNNQueryBuilder.builder().fieldName(fieldName).vector(testData.queries[i]).k(k).build();
Response response = searchKNNIndex(indexName, queryBuilder, k);
String responseBody = EntityUtils.toString(response.getEntity());
List<KNNResult> knnResults = parseSearchResponse(responseBody, fieldName);
assertEquals(k, knnResults.size());

List<Float> actualScores = parseSearchResponseScore(responseBody, fieldName);
for (int j = 0; j < k; j++) {
float[] primitiveArray = knnResults.get(j).getVector();
assertEquals(
KNNEngine.FAISS.score(KNNScoringUtil.l2Squared(testData.queries[i], primitiveArray), SpaceType.L2),
actualScores.get(j),
0.0001
);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.search;

import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.knn.KNNTestCase;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.search.deciders.ConcurrentSearchDecision;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class KNNConcurrentSearchRequestDeciderTests extends KNNTestCase {

public void testDecider_thenSucceed() {
ConcurrentSearchDecision noop = new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "Default decision");

KNNConcurrentSearchRequestDecider decider = new KNNConcurrentSearchRequestDecider();
assertDecision(noop, decider.getConcurrentSearchDecision());
IndexSettings indexSettingsMock = mock(IndexSettings.class);
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE);

// Non KNNQueryBuilder
decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock);
assertDecision(noop, decider.getConcurrentSearchDecision());
decider.evaluateForQuery(
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(),
indexSettingsMock
);
assertDecision(noop, decider.getConcurrentSearchDecision());

when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE);
decider.evaluateForQuery(
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(),
indexSettingsMock
);
ConcurrentSearchDecision yes = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.YES,
"Enable concurrent search for knn as Query has k-NN query in it and index is k-nn index"
);
assertDecision(yes, decider.getConcurrentSearchDecision());

decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock);
assertDecision(noop, decider.getConcurrentSearchDecision());
}

public void testDeciderFactory_thenSucceed() {
KNNConcurrentSearchRequestDecider.Factory factory = new KNNConcurrentSearchRequestDecider.Factory();
IndexSettings indexSettingsMock = mock(IndexSettings.class);
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE);
assertNotSame(factory.create(indexSettingsMock).get(), factory.create(indexSettingsMock).get());
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE);
assertTrue(factory.create(indexSettingsMock).isEmpty());
}

private void assertDecision(ConcurrentSearchDecision expected, ConcurrentSearchDecision actual) {
assertEquals(expected.getDecisionReason(), actual.getDecisionReason());
assertEquals(expected.getDecisionStatus(), actual.getDecisionStatus());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import lombok.NonNull;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.knn.common.KNNConstants;

import java.io.IOException;

Expand All @@ -26,7 +27,7 @@ public class KNNJsonIndexMappingsBuilder {
private String vectorDataType;
private Method method;

public String getIndexMapping() throws IOException {
public XContentBuilder getIndexMappingBuilder() throws IOException {
if (nestedFieldName != null) {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
.startObject()
Expand All @@ -40,7 +41,7 @@ public String getIndexMapping() throws IOException {
addVectorDataType(xContentBuilder);
addMethod(xContentBuilder);
xContentBuilder.endObject().endObject().endObject().endObject().endObject();
return xContentBuilder.toString();
return xContentBuilder;
} else {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
.startObject()
Expand All @@ -51,10 +52,14 @@ public String getIndexMapping() throws IOException {
addVectorDataType(xContentBuilder);
addMethod(xContentBuilder);
xContentBuilder.endObject().endObject().endObject();
return xContentBuilder.toString();
return xContentBuilder;
}
}

public String getIndexMapping() throws IOException {
return getIndexMappingBuilder().toString();
}

private void addVectorDataType(final XContentBuilder xContentBuilder) throws IOException {
if (vectorDataType == null) {
return;
Expand Down Expand Up @@ -104,6 +109,7 @@ public static class Parameters {
private Encoder encoder;
private Integer efConstruction;
private Integer efSearch;
private Integer m;

private void addTo(final XContentBuilder xContentBuilder) throws IOException {
xContentBuilder.startObject("parameters");
Expand All @@ -113,6 +119,9 @@ private void addTo(final XContentBuilder xContentBuilder) throws IOException {
if (efSearch != null) {
xContentBuilder.field("ef_search", efSearch);
}
if (m != null) {
xContentBuilder.field(KNNConstants.METHOD_PARAMETER_M, m);
}
addEncoder(xContentBuilder);
xContentBuilder.endObject();
}
Expand Down
Loading