From a0c82c608837d1c8a68a751e8641525eb26cc86c Mon Sep 17 00:00:00 2001 From: Varun Jain Date: Tue, 9 Jul 2024 12:05:34 -0700 Subject: [PATCH] Rebasing with main (#826) * Adds method_parameters in neural search query to support ef_search (#787) (#814) Signed-off-by: Tejas Shah * Add BWC for batch ingestion (#769) * Add BWC for batch ingestion Signed-off-by: Liyun Xiu * Update Changelog Signed-off-by: Liyun Xiu * Fix spotlessLicenseCheck Signed-off-by: Liyun Xiu * Fix comments Signed-off-by: Liyun Xiu * Reuse the same code Signed-off-by: Liyun Xiu * Rename some functions Signed-off-by: Liyun Xiu * Rename a function Signed-off-by: Liyun Xiu * Minor change to trigger rebuild Signed-off-by: Liyun Xiu --------- Signed-off-by: Liyun Xiu * Neural sparse query two-phase search processor's bwc test (#777) * Poc of pipeline Signed-off-by: conggguan * Complete some settings for two phase pipeline. Signed-off-by: conggguan * Change the implement of two-phase from QueryBuilderVistor to custom process funciton. Signed-off-by: conggguan * Add It and fix some bug on the state of multy same neuralsparsequerybuilder. Signed-off-by: conggguan * Simplify some logic, and correct some format. Signed-off-by: conggguan * Optimize some format. Signed-off-by: conggguan * Add some test case. Signed-off-by: conggguan * Optimize some logic for zhichao-aws's comments. Signed-off-by: conggguan * Optimize a line without application. Signed-off-by: conggguan * Add some comments, remove some redundant lines, fix some format. Signed-off-by: conggguan * Remove a redundant null check, fix a if format. Signed-off-by: conggguan * Fix a typo for a comment, camelcase format for some variable. Signed-off-by: conggguan * Add some comments to illustrate the influence of the modify on 2-phase search pipeline to neural sparse query builder. Signed-off-by: conggguan * Add restart and rolling upgrade bwc test for neural sparse two phase processor. Signed-off-by: conggguan * Spotless on qa. Signed-off-by: conggguan * Update change log for two-phase BWC test. Signed-off-by: conggguan * Remove redundant lines of two-phase BWC test. Signed-off-by: conggguan * Add changelog. Signed-off-by: conggguan * Add the PR link and number for the CHANGELOG.md. Signed-off-by: conggguan * [Fix] NeuralSparseTwoPhaseProcessorIT created wrong ingest pipeline, fix it to correct API. Signed-off-by: conggguan --------- Signed-off-by: conggguan Signed-off-by: conggguan <157357330+conggguan@users.noreply.github.com> * Enable '.' for nested field in text embedding processor (#811) * Added nested structure for text embed processor mapping Signed-off-by: Martin Gaievski * Fix linux build CI error due to action runner env upgrade node 20 (#821) * Fix linux build CI error due to action runner env upgrade node 20 Signed-off-by: Varun Jain * Fix linux build on additional integ tests Signed-off-by: Varun Jain --------- Signed-off-by: Varun Jain --------- Signed-off-by: Tejas Shah Signed-off-by: Liyun Xiu Signed-off-by: conggguan Signed-off-by: conggguan <157357330+conggguan@users.noreply.github.com> Signed-off-by: Martin Gaievski Signed-off-by: Varun Jain Co-authored-by: Tejas Shah Co-authored-by: Liyun Xiu Co-authored-by: conggguan <157357330+conggguan@users.noreply.github.com> Co-authored-by: Martin Gaievski --- .github/workflows/CI.yml | 2 + .github/workflows/test_aggregations.yml | 2 + .github/workflows/test_security.yml | 2 + CHANGELOG.md | 4 + qa/restart-upgrade/build.gradle | 20 +- .../neuralsearch/bwc/BatchIngestionIT.java | 53 +++ .../neuralsearch/bwc/HybridSearchIT.java | 15 +- .../neuralsearch/bwc/KnnRadialSearchIT.java | 2 + .../neuralsearch/bwc/MultiModalSearchIT.java | 1 + .../bwc/NeuralSparseTwoPhaseProcessorIT.java | 65 ++++ .../bwc/TextChunkingProcessorIT.java | 27 +- ...lSparseTwoPhaseProcessorConfiguration.json | 16 + qa/rolling-upgrade/build.gradle | 51 ++- .../neuralsearch/bwc/BatchIngestionIT.java | 63 ++++ .../neuralsearch/bwc/HybridSearchIT.java | 13 +- .../neuralsearch/bwc/KnnRadialSearchIT.java | 2 + .../neuralsearch/bwc/MultiModalSearchIT.java | 1 + .../bwc/NeuralSparseTwoPhaseProcessorIT.java | 78 +++++ ...lSparseTwoPhaseProcessorConfiguration.json | 16 + .../common/MinClusterVersionUtil.java | 50 +++ .../processor/InferenceProcessor.java | 75 +++-- .../query/NeuralQueryBuilder.java | 44 +-- .../processor/InferenceProcessorTestCase.java | 14 + .../processor/NormalizationProcessorIT.java | 3 + .../processor/ScoreCombinationIT.java | 8 +- .../processor/ScoreNormalizationIT.java | 12 +- .../processor/TextEmbeddingProcessorIT.java | 70 ++++ .../TextEmbeddingProcessorTests.java | 301 +++++++++++++++--- .../query/NeuralQueryBuilderTests.java | 39 ++- .../neuralsearch/query/NeuralQueryIT.java | 14 +- .../resources/processor/IndexMappings.json | 21 ++ ...eConfigurationWithNestedFieldsMapping.json | 19 ++ src/test/resources/processor/ingest_doc3.json | 20 ++ .../neuralsearch/BaseNeuralSearchIT.java | 57 +++- .../util/BatchIngestionUtils.java | 40 +++ 35 files changed, 1101 insertions(+), 119 deletions(-) create mode 100644 qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java create mode 100644 qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseTwoPhaseProcessorIT.java create mode 100644 qa/restart-upgrade/src/test/resources/processor/NeuralSparseTwoPhaseProcessorConfiguration.json create mode 100644 qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java create mode 100644 qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseTwoPhaseProcessorIT.java create mode 100644 qa/rolling-upgrade/src/test/resources/processor/NeuralSparseTwoPhaseProcessorConfiguration.json create mode 100644 src/main/java/org/opensearch/neuralsearch/common/MinClusterVersionUtil.java create mode 100644 src/test/resources/processor/PipelineConfigurationWithNestedFieldsMapping.json create mode 100644 src/test/resources/processor/ingest_doc3.json create mode 100644 src/testFixtures/java/org/opensearch/neuralsearch/util/BatchIngestionUtils.java diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index df31b0353..a3cb16721 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -10,6 +10,8 @@ on: branches: - "*" - "feature/**" +env: + ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true jobs: Get-CI-Image-Tag: diff --git a/.github/workflows/test_aggregations.yml b/.github/workflows/test_aggregations.yml index 3bd5a66c0..e49c47eb5 100644 --- a/.github/workflows/test_aggregations.yml +++ b/.github/workflows/test_aggregations.yml @@ -10,6 +10,8 @@ on: branches: - "*" - "feature/**" +env: + ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true jobs: Get-CI-Image-Tag: diff --git a/.github/workflows/test_security.yml b/.github/workflows/test_security.yml index 77d3b24e0..fbbf159ae 100644 --- a/.github/workflows/test_security.yml +++ b/.github/workflows/test_security.yml @@ -10,6 +10,8 @@ on: branches: - "*" - "feature/**" +env: + ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true jobs: Get-CI-Image-Tag: diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a72fcdaa..7f432816a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,8 +15,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x](https://github.com/opensearch-project/neural-search/compare/2.15...2.x) ### Features ### Enhancements +- Adds dynamic knn query parameters efsearch and nprobes [#814](https://github.com/opensearch-project/neural-search/pull/814/) +- Enable '.' for nested field in text embedding processor ([#811](https://github.com/opensearch-project/neural-search/pull/811)) ### Bug Fixes ### Infrastructure +- Add BWC for batch ingestion ([#769](https://github.com/opensearch-project/neural-search/pull/769)) +- Add backward test cases for neural sparse two phase processor ([#777](https://github.com/opensearch-project/neural-search/pull/777)) ### Documentation ### Maintenance ### Refactoring diff --git a/qa/restart-upgrade/build.gradle b/qa/restart-upgrade/build.gradle index ce29c77ca..c5badd248 100644 --- a/qa/restart-upgrade/build.gradle +++ b/qa/restart-upgrade/build.gradle @@ -90,10 +90,18 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the k-NN radial search tests because we introduce this feature in 2.14 + // Excluding the k-NN radial search tests and batch ingestion tests because we introduce these features in 2.14 if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" + } + } + + // Excluding the NeuralSparseQuery two-phase search pipeline tests because we introduce this feature in 2.15 + if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13") || ext.neural_search_bwc_version.startsWith("2.14")){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseTwoPhaseProcessorIT.*" } } @@ -146,10 +154,18 @@ task testAgainstNewCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the k-NN radial search tests because we introduce this feature in 2.14 + // Excluding the k-NN radial search tests and batch ingestion tests because we introduce these features in 2.14 if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" + } + } + + // Excluding the NeuralSparseQuery two-phase search pipeline tests because we introduce this feature in 2.15 + if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13") || ext.neural_search_bwc_version.startsWith("2.14")){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseTwoPhaseProcessorIT.*" } } diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java new file mode 100644 index 000000000..0e490e2e4 --- /dev/null +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.bwc; + +import org.opensearch.neuralsearch.util.TestUtils; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +import static org.opensearch.neuralsearch.util.BatchIngestionUtils.prepareDataForBulkIngestion; +import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER; +import static org.opensearch.neuralsearch.util.TestUtils.SPARSE_ENCODING_PROCESSOR; + +public class BatchIngestionIT extends AbstractRestartUpgradeRestTestCase { + private static final String PIPELINE_NAME = "pipeline-BatchIngestionIT"; + private static final String TEXT_FIELD_NAME = "passage_text"; + private static final String EMBEDDING_FIELD_NAME = "passage_embedding"; + private static final int batchSize = 3; + + public void testBatchIngestionWithNeuralSparseProcessor_E2EFlow() throws Exception { + waitForClusterHealthGreen(NODES_BWC_CLUSTER); + String indexName = getIndexNameForTest(); + if (isRunningAgainstOldCluster()) { + String modelId = uploadSparseEncodingModel(); + loadModel(modelId); + createPipelineForSparseEncodingProcessor(modelId, PIPELINE_NAME); + createIndexWithConfiguration( + indexName, + Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())), + PIPELINE_NAME + ); + List> docs = prepareDataForBulkIngestion(0, 5); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs, batchSize); + validateDocCountAndInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class); + } else { + String modelId = null; + modelId = TestUtils.getModelId(getIngestionPipeline(PIPELINE_NAME), SPARSE_ENCODING_PROCESSOR); + loadModel(modelId); + try { + List> docs = prepareDataForBulkIngestion(5, 5); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs, batchSize); + validateDocCountAndInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class); + } finally { + wipeOfTestResources(indexName, PIPELINE_NAME, modelId, null); + } + } + } + +} diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java index f5289fe79..845396dd0 100644 --- a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java @@ -10,6 +10,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; + import org.opensearch.index.query.MatchQueryBuilder; import static org.opensearch.neuralsearch.util.TestUtils.getModelId; import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER; @@ -69,6 +70,7 @@ private void validateNormalizationProcessor(final String fileName, final String loadModel(modelId); addDocuments(getIndexNameForTest(), false); validateTestIndex(modelId, getIndexNameForTest(), searchPipelineName); + validateTestIndex(modelId, getIndexNameForTest(), searchPipelineName, Map.of("ef_search", 100)); } finally { wipeOfTestResources(getIndexNameForTest(), pipelineName, modelId, searchPipelineName); } @@ -96,10 +98,14 @@ private void createSearchPipeline(final String pipelineName) { ); } - private void validateTestIndex(final String modelId, final String index, final String searchPipeline) throws Exception { + private void validateTestIndex(final String modelId, final String index, final String searchPipeline) { + validateTestIndex(modelId, index, searchPipeline, null); + } + + private void validateTestIndex(final String modelId, final String index, final String searchPipeline, Map methodParameters) { int docCount = getDocCount(index); assertEquals(6, docCount); - HybridQueryBuilder hybridQueryBuilder = getQueryBuilder(modelId); + HybridQueryBuilder hybridQueryBuilder = getQueryBuilder(modelId, methodParameters); Map searchResponseAsMap = search(index, hybridQueryBuilder, null, 1, Map.of("search_pipeline", searchPipeline)); assertNotNull(searchResponseAsMap); int hits = getHitCount(searchResponseAsMap); @@ -110,12 +116,15 @@ private void validateTestIndex(final String modelId, final String index, final S } } - private HybridQueryBuilder getQueryBuilder(final String modelId) { + private HybridQueryBuilder getQueryBuilder(final String modelId, Map methodParameters) { NeuralQueryBuilder neuralQueryBuilder = new NeuralQueryBuilder(); neuralQueryBuilder.fieldName("passage_embedding"); neuralQueryBuilder.modelId(modelId); neuralQueryBuilder.queryText(QUERY); neuralQueryBuilder.k(5); + if (methodParameters != null) { + neuralQueryBuilder.methodParameters(methodParameters); + } MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("text", QUERY); diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/KnnRadialSearchIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/KnnRadialSearchIT.java index 8a6dfcde3..ece2bbb9e 100644 --- a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/KnnRadialSearchIT.java +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/KnnRadialSearchIT.java @@ -60,6 +60,7 @@ private void validateIndexQuery(final String modelId) { null, 0.01f, null, + null, null ); Map responseWithMinScoreQuery = search(getIndexNameForTest(), neuralQueryBuilderWithMinScoreQuery, 1); @@ -74,6 +75,7 @@ private void validateIndexQuery(final String modelId) { 100000f, null, null, + null, null ); Map responseWithMaxDistanceQuery = search(getIndexNameForTest(), neuralQueryBuilderWithMaxDistanceQuery, 1); diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java index afa29bab5..54d993b35 100644 --- a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java @@ -62,6 +62,7 @@ private void validateTestIndex(final String modelId) throws Exception { null, null, null, + null, null ); Map response = search(getIndexNameForTest(), neuralQueryBuilder, 1); diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseTwoPhaseProcessorIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseTwoPhaseProcessorIT.java new file mode 100644 index 000000000..4b00a7916 --- /dev/null +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseTwoPhaseProcessorIT.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.bwc; + +import org.opensearch.common.settings.Settings; +import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder; +import org.opensearch.neuralsearch.util.TestUtils; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER; +import static org.opensearch.neuralsearch.util.TestUtils.SPARSE_ENCODING_PROCESSOR; + +public class NeuralSparseTwoPhaseProcessorIT extends AbstractRestartUpgradeRestTestCase { + + private static final String NEURAL_SPARSE_INGEST_PIPELINE_NAME = "nstp-nlp-ingest-pipeline-dense"; + private static final String NEURAL_SPARSE_TWO_PHASE_SEARCH_PIPELINE_NAME = "nstp-nlp-two-phase-search-pipeline-sparse"; + private static final String TEST_ENCODING_FIELD = "passage_embedding"; + private static final String TEST_TEXT_FIELD = "passage_text"; + private static final String TEXT_1 = "Hello world a b"; + + public void testNeuralSparseQueryTwoPhaseProcessor_NeuralSearch_E2EFlow() throws Exception { + waitForClusterHealthGreen(NODES_BWC_CLUSTER); + NeuralSparseQueryBuilder neuralSparseQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_ENCODING_FIELD).queryText(TEXT_1); + if (isRunningAgainstOldCluster()) { + String modelId = uploadSparseEncodingModel(); + loadModel(modelId); + neuralSparseQueryBuilder.modelId(modelId); + createPipelineForSparseEncodingProcessor(modelId, NEURAL_SPARSE_INGEST_PIPELINE_NAME); + createIndexWithConfiguration( + getIndexNameForTest(), + Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())), + NEURAL_SPARSE_INGEST_PIPELINE_NAME + ); + addSparseEncodingDoc(getIndexNameForTest(), "0", List.of(), List.of(), List.of(TEST_TEXT_FIELD), List.of(TEXT_1)); + createNeuralSparseTwoPhaseSearchProcessor(NEURAL_SPARSE_TWO_PHASE_SEARCH_PIPELINE_NAME); + updateIndexSettings( + getIndexNameForTest(), + Settings.builder().put("index.search.default_pipeline", NEURAL_SPARSE_TWO_PHASE_SEARCH_PIPELINE_NAME) + ); + Object resultWith2PhasePipeline = search(getIndexNameForTest(), neuralSparseQueryBuilder, 1).get("hits"); + assertNotNull(resultWith2PhasePipeline); + } else { + String modelId = null; + try { + modelId = TestUtils.getModelId(getIngestionPipeline(NEURAL_SPARSE_INGEST_PIPELINE_NAME), SPARSE_ENCODING_PROCESSOR); + loadModel(modelId); + neuralSparseQueryBuilder.modelId(modelId); + Object resultWith2PhasePipeline = search(getIndexNameForTest(), neuralSparseQueryBuilder, 1).get("hits"); + assertNotNull(resultWith2PhasePipeline); + } finally { + wipeOfTestResources( + getIndexNameForTest(), + NEURAL_SPARSE_INGEST_PIPELINE_NAME, + modelId, + NEURAL_SPARSE_TWO_PHASE_SEARCH_PIPELINE_NAME + ); + } + } + } +} diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/TextChunkingProcessorIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/TextChunkingProcessorIT.java index ca314300c..ba44eba9a 100644 --- a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/TextChunkingProcessorIT.java +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/TextChunkingProcessorIT.java @@ -56,20 +56,21 @@ private void createChunkingIndex(String indexName) throws Exception { createIndexWithConfiguration(indexName, indexSetting, PIPELINE_NAME); } - private void validateTestIndex(String indexName, String fieldName, int documentCount, Object expected) { - int docCount = getDocCount(indexName); - assertEquals(documentCount, docCount); + private Map getFirstDocumentInQuery(String indexName, int resultSize) { MatchAllQueryBuilder query = new MatchAllQueryBuilder(); - Map searchResults = search(indexName, query, 10); + Map searchResults = search(indexName, query, resultSize); assertNotNull(searchResults); - Map document = getFirstInnerHit(searchResults); - assertNotNull(document); - Object documentSource = document.get("_source"); - assert (documentSource instanceof Map); - @SuppressWarnings("unchecked") - Map documentSourceMap = (Map) documentSource; - assert (documentSourceMap).containsKey(fieldName); - Object ingestOutputs = documentSourceMap.get(fieldName); - assertEquals(expected, ingestOutputs); + return getFirstInnerHit(searchResults); + } + + private void validateTestIndex(String indexName, String fieldName, int documentCount, Object expected) { + Object outputs = validateDocCountAndInfo( + indexName, + documentCount, + () -> getFirstDocumentInQuery(indexName, 10), + fieldName, + List.class + ); + assertEquals(expected, outputs); } } diff --git a/qa/restart-upgrade/src/test/resources/processor/NeuralSparseTwoPhaseProcessorConfiguration.json b/qa/restart-upgrade/src/test/resources/processor/NeuralSparseTwoPhaseProcessorConfiguration.json new file mode 100644 index 000000000..45e435268 --- /dev/null +++ b/qa/restart-upgrade/src/test/resources/processor/NeuralSparseTwoPhaseProcessorConfiguration.json @@ -0,0 +1,16 @@ +{ + "request_processors": [ + { + "neural_sparse_two_phase_processor": { + "tag": "neural-sparse", + "description": "This processor is making two-phase rescorer.", + "enabled": true, + "two_phase_parameter": { + "prune_ratio": %f, + "expansion_rate": %f, + "max_window_size": %d + } + } + } + ] +} diff --git a/qa/rolling-upgrade/build.gradle b/qa/rolling-upgrade/build.gradle index c2c03b824..285e65093 100644 --- a/qa/rolling-upgrade/build.gradle +++ b/qa/rolling-upgrade/build.gradle @@ -90,13 +90,24 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the k-NN radial search tests because we introduce this feature in 2.14 + // Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14 if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" } } + // Excluding the neural sparse two phase processor test because we introduce this feature in 2.15 + if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") + || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") + || ext.neural_search_bwc_version.startsWith("2.13") || ext.neural_search_bwc_version.startsWith("2.14")){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseTwoPhaseProcessorIT.*" + } + } + + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.security.manager', 'false' @@ -147,13 +158,24 @@ task testAgainstOneThirdUpgradedCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the k-NN radial search tests because we introduce this feature in 2.14 + // Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14 if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" } } + // Excluding the neural sparse two phase processor test because we introduce this feature in 2.15 + if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") + || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") + || ext.neural_search_bwc_version.startsWith("2.13") || ext.neural_search_bwc_version.startsWith("2.14")){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseTwoPhaseProcessorIT.*" + } + } + + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.security.manager', 'false' @@ -203,13 +225,24 @@ task testAgainstTwoThirdsUpgradedCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the k-NN radial search tests because we introduce this feature in 2.14 + // Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14 if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" } } + // Excluding the neural sparse two phase processor test because we introduce this feature in 2.15 + if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") + || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") + || ext.neural_search_bwc_version.startsWith("2.13") || ext.neural_search_bwc_version.startsWith("2.14")){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseTwoPhaseProcessorIT.*" + } + } + + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.security.manager', 'false' @@ -259,10 +292,20 @@ task testRollingUpgrade(type: StandaloneRestIntegTestTask) { } } - // Excluding the k-NN radial search tests because we introduce this feature in 2.14 + // Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14 if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" + } + } + + // Excluding the neural sparse two phase processor test because we introduce this feature in 2.15 + if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") + || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") + || ext.neural_search_bwc_version.startsWith("2.13") || ext.neural_search_bwc_version.startsWith("2.14")){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseTwoPhaseProcessorIT.*" } } diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java new file mode 100644 index 000000000..3052b48cd --- /dev/null +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.bwc; + +import org.opensearch.neuralsearch.util.TestUtils; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +import static org.opensearch.neuralsearch.util.BatchIngestionUtils.prepareDataForBulkIngestion; +import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER; +import static org.opensearch.neuralsearch.util.TestUtils.SPARSE_ENCODING_PROCESSOR; + +public class BatchIngestionIT extends AbstractRollingUpgradeTestCase { + private static final String SPARSE_PIPELINE = "BatchIngestionIT_sparse_pipeline_rolling"; + private static final String TEXT_FIELD_NAME = "passage_text"; + private static final String EMBEDDING_FIELD_NAME = "passage_embedding"; + + public void testBatchIngestion_SparseEncodingProcessor_E2EFlow() throws Exception { + waitForClusterHealthGreen(NODES_BWC_CLUSTER); + String indexName = getIndexNameForTest(); + String sparseModelId = null; + switch (getClusterType()) { + case OLD: + sparseModelId = uploadSparseEncodingModel(); + loadModel(sparseModelId); + createPipelineForSparseEncodingProcessor(sparseModelId, SPARSE_PIPELINE); + createIndexWithConfiguration( + indexName, + Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())), + SPARSE_PIPELINE + ); + List> docs = prepareDataForBulkIngestion(0, 5); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docs, 2); + validateDocCountAndInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class); + break; + case MIXED: + sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_PIPELINE), SPARSE_ENCODING_PROCESSOR); + loadModel(sparseModelId); + List> docsForMixed = prepareDataForBulkIngestion(5, 5); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForMixed, 3); + validateDocCountAndInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class); + break; + case UPGRADED: + try { + sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_PIPELINE), SPARSE_ENCODING_PROCESSOR); + loadModel(sparseModelId); + List> docsForUpgraded = prepareDataForBulkIngestion(10, 5); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForUpgraded, 2); + validateDocCountAndInfo(indexName, 15, () -> getDocById(indexName, "14"), EMBEDDING_FIELD_NAME, Map.class); + } finally { + wipeOfTestResources(indexName, SPARSE_PIPELINE, sparseModelId, null); + } + break; + default: + throw new IllegalStateException("Unexpected value: " + getClusterType()); + } + } +} diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java index 903ffc9be..ba2ff7979 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java @@ -73,6 +73,7 @@ public void testNormalizationProcessor_whenIndexWithMultipleShards_E2EFlow() thr loadModel(modelId); addDocument(getIndexNameForTest(), "2", TEST_FIELD, TEXT_UPGRADED, null, null); validateTestIndexOnUpgrade(totalDocsCountUpgraded, modelId); + validateTestIndexOnUpgrade(totalDocsCountUpgraded, modelId, Map.of("ef_search", 100)); } finally { wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, SEARCH_PIPELINE_NAME); } @@ -83,10 +84,15 @@ public void testNormalizationProcessor_whenIndexWithMultipleShards_E2EFlow() thr } private void validateTestIndexOnUpgrade(final int numberOfDocs, final String modelId) throws Exception { + validateTestIndexOnUpgrade(numberOfDocs, modelId, null); + } + + private void validateTestIndexOnUpgrade(final int numberOfDocs, final String modelId, Map methodParameters) + throws Exception { int docCount = getDocCount(getIndexNameForTest()); assertEquals(numberOfDocs, docCount); loadModel(modelId); - HybridQueryBuilder hybridQueryBuilder = getQueryBuilder(modelId); + HybridQueryBuilder hybridQueryBuilder = getQueryBuilder(modelId, methodParameters); Map searchResponseAsMap = search( getIndexNameForTest(), hybridQueryBuilder, @@ -103,12 +109,15 @@ private void validateTestIndexOnUpgrade(final int numberOfDocs, final String mod } } - private HybridQueryBuilder getQueryBuilder(final String modelId) { + private HybridQueryBuilder getQueryBuilder(final String modelId, final Map methodParameters) { NeuralQueryBuilder neuralQueryBuilder = new NeuralQueryBuilder(); neuralQueryBuilder.fieldName("passage_embedding"); neuralQueryBuilder.modelId(modelId); neuralQueryBuilder.queryText(QUERY); neuralQueryBuilder.k(5); + if (methodParameters != null) { + neuralQueryBuilder.methodParameters(methodParameters); + } MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("text", QUERY); diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/KnnRadialSearchIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/KnnRadialSearchIT.java index 15be7a15b..17d15898b 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/KnnRadialSearchIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/KnnRadialSearchIT.java @@ -86,6 +86,7 @@ private void validateIndexQueryOnUpgrade(final int numberOfDocs, final String mo null, 0.01f, null, + null, null ); Map responseWithMinScore = search(getIndexNameForTest(), neuralQueryBuilderWithMinScoreQuery, 1); @@ -100,6 +101,7 @@ private void validateIndexQueryOnUpgrade(final int numberOfDocs, final String mo 100000f, null, null, + null, null ); Map responseWithMaxScore = search(getIndexNameForTest(), neuralQueryBuilderWithMaxDistanceQuery, 1); diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java index 1154f1e51..8e0ff7568 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java @@ -85,6 +85,7 @@ private void validateTestIndexOnUpgrade(final int numberOfDocs, final String mod null, null, null, + null, null ); Map responseWithKQuery = search(getIndexNameForTest(), neuralQueryBuilderWithKQuery, 1); diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseTwoPhaseProcessorIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseTwoPhaseProcessorIT.java new file mode 100644 index 000000000..c95ee93e0 --- /dev/null +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseTwoPhaseProcessorIT.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.bwc; + +import org.opensearch.common.settings.Settings; +import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder; +import org.opensearch.neuralsearch.util.TestUtils; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER; +import static org.opensearch.neuralsearch.util.TestUtils.SPARSE_ENCODING_PROCESSOR; + +public class NeuralSparseTwoPhaseProcessorIT extends AbstractRollingUpgradeTestCase { + // add prefix to avoid conflicts with other IT class, since don't wipe resources after first round + private static final String SPARSE_INGEST_PIPELINE_NAME = "nstp-nlp-ingest-pipeline-sparse"; + private static final String SPARSE_SEARCH_TWO_PHASE_PIPELINE_NAME = "nstp-nlp-two-phase-search-pipeline-sparse"; + private static final String TEST_ENCODING_FIELD = "passage_embedding"; + private static final String TEST_TEXT_FIELD = "passage_text"; + private static final String TEXT_1 = "Hello world a b"; + private String sparseModelId = ""; + + // test of NeuralSparseTwoPhaseProcessor supports neural_sparse query's two phase speed up + // the feature is introduced from 2.15 + public void testNeuralSparseTwoPhaseProcessorIT_NeuralSparseSearch_E2EFlow() throws Exception { + waitForClusterHealthGreen(NODES_BWC_CLUSTER); + // will set the model_id after we obtain the id + NeuralSparseQueryBuilder neuralSparseQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_ENCODING_FIELD).queryText(TEXT_1); + + switch (getClusterType()) { + case OLD: + sparseModelId = uploadSparseEncodingModel(); + loadModel(sparseModelId); + neuralSparseQueryBuilder.modelId(sparseModelId); + createPipelineForSparseEncodingProcessor(sparseModelId, SPARSE_INGEST_PIPELINE_NAME); + createIndexWithConfiguration( + getIndexNameForTest(), + Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())), + SPARSE_INGEST_PIPELINE_NAME + ); + addSparseEncodingDoc(getIndexNameForTest(), "0", List.of(), List.of(), List.of(TEST_TEXT_FIELD), List.of(TEXT_1)); + createNeuralSparseTwoPhaseSearchProcessor(SPARSE_SEARCH_TWO_PHASE_PIPELINE_NAME); + updateIndexSettings( + getIndexNameForTest(), + Settings.builder().put("index.search.default_pipeline", SPARSE_SEARCH_TWO_PHASE_PIPELINE_NAME) + ); + assertNotNull(search(getIndexNameForTest(), neuralSparseQueryBuilder, 1).get("hits")); + break; + case MIXED: + sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_INGEST_PIPELINE_NAME), SPARSE_ENCODING_PROCESSOR); + loadModel(sparseModelId); + neuralSparseQueryBuilder.modelId(sparseModelId); + assertNotNull(search(getIndexNameForTest(), neuralSparseQueryBuilder, 1).get("hits")); + break; + case UPGRADED: + try { + sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_INGEST_PIPELINE_NAME), SPARSE_ENCODING_PROCESSOR); + loadModel(sparseModelId); + neuralSparseQueryBuilder.modelId(sparseModelId); + assertNotNull(search(getIndexNameForTest(), neuralSparseQueryBuilder, 1).get("hits")); + } finally { + wipeOfTestResources( + getIndexNameForTest(), + SPARSE_INGEST_PIPELINE_NAME, + sparseModelId, + SPARSE_SEARCH_TWO_PHASE_PIPELINE_NAME + ); + } + break; + default: + throw new IllegalStateException("Unexpected value: " + getClusterType()); + } + } +} diff --git a/qa/rolling-upgrade/src/test/resources/processor/NeuralSparseTwoPhaseProcessorConfiguration.json b/qa/rolling-upgrade/src/test/resources/processor/NeuralSparseTwoPhaseProcessorConfiguration.json new file mode 100644 index 000000000..45e435268 --- /dev/null +++ b/qa/rolling-upgrade/src/test/resources/processor/NeuralSparseTwoPhaseProcessorConfiguration.json @@ -0,0 +1,16 @@ +{ + "request_processors": [ + { + "neural_sparse_two_phase_processor": { + "tag": "neural-sparse", + "description": "This processor is making two-phase rescorer.", + "enabled": true, + "two_phase_parameter": { + "prune_ratio": %f, + "expansion_rate": %f, + "max_window_size": %d + } + } + } + ] +} diff --git a/src/main/java/org/opensearch/neuralsearch/common/MinClusterVersionUtil.java b/src/main/java/org/opensearch/neuralsearch/common/MinClusterVersionUtil.java new file mode 100644 index 000000000..160b2fa4d --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/common/MinClusterVersionUtil.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.common; + +import com.google.common.collect.ImmutableMap; +import org.opensearch.Version; +import org.opensearch.knn.index.IndexUtil; +import org.opensearch.neuralsearch.util.NeuralSearchClusterUtil; + +import java.util.Map; + +import static org.opensearch.knn.index.query.KNNQueryBuilder.MAX_DISTANCE_FIELD; +import static org.opensearch.knn.index.query.KNNQueryBuilder.MIN_SCORE_FIELD; +import static org.opensearch.neuralsearch.query.NeuralQueryBuilder.MODEL_ID_FIELD; + +/** + * A util class which holds the logic to determine the min version supported by the request parameters + */ +public final class MinClusterVersionUtil { + + private static final Version MINIMAL_SUPPORTED_VERSION_DEFAULT_MODEL_ID = Version.V_2_11_0; + private static final Version MINIMAL_SUPPORTED_VERSION_RADIAL_SEARCH = Version.V_2_14_0; + + // Note this minimal version will act as a override + private static final Map MINIMAL_VERSION_NEURAL = ImmutableMap.builder() + .put(MODEL_ID_FIELD.getPreferredName(), MINIMAL_SUPPORTED_VERSION_DEFAULT_MODEL_ID) + .put(MAX_DISTANCE_FIELD.getPreferredName(), MINIMAL_SUPPORTED_VERSION_RADIAL_SEARCH) + .put(MIN_SCORE_FIELD.getPreferredName(), MINIMAL_SUPPORTED_VERSION_RADIAL_SEARCH) + .build(); + + public static boolean isClusterOnOrAfterMinReqVersionForDefaultModelIdSupport() { + return NeuralSearchClusterUtil.instance().getClusterMinVersion().onOrAfter(MINIMAL_SUPPORTED_VERSION_DEFAULT_MODEL_ID); + } + + public static boolean isClusterOnOrAfterMinReqVersionForRadialSearch() { + return NeuralSearchClusterUtil.instance().getClusterMinVersion().onOrAfter(MINIMAL_SUPPORTED_VERSION_RADIAL_SEARCH); + } + + public static boolean isClusterOnOrAfterMinReqVersion(String key) { + Version version; + if (MINIMAL_VERSION_NEURAL.containsKey(key)) { + version = MINIMAL_VERSION_NEURAL.get(key); + } else { + version = IndexUtil.minimalRequiredVersionMap.get(key); + } + return NeuralSearchClusterUtil.instance().getClusterMinVersion().onOrAfter(version); + } +} diff --git a/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java b/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java index 9465b250f..d9f9c7048 100644 --- a/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java +++ b/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -21,6 +22,8 @@ import lombok.AllArgsConstructor; import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.opensearch.common.collect.Tuple; import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.cluster.service.ClusterService; @@ -120,7 +123,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public void execute(IngestDocument ingestDocument, BiConsumer handler) { try { validateEmbeddingFieldsValue(ingestDocument); - Map processMap = buildMapWithTargetKeyAndOriginalValue(ingestDocument); + Map processMap = buildMapWithTargetKeys(ingestDocument); List inferenceList = createInferenceList(processMap); if (inferenceList.size() == 0) { handler.accept(ingestDocument, null); @@ -228,7 +231,7 @@ private List getDataForInference(List i List inferenceList = null; try { validateEmbeddingFieldsValue(ingestDocumentWrapper.getIngestDocument()); - processMap = buildMapWithTargetKeyAndOriginalValue(ingestDocumentWrapper.getIngestDocument()); + processMap = buildMapWithTargetKeys(ingestDocumentWrapper.getIngestDocument()); inferenceList = createInferenceList(processMap); } catch (Exception e) { ingestDocumentWrapper.update(ingestDocumentWrapper.getIngestDocument(), e); @@ -276,15 +279,17 @@ private void createInferenceListForMapTypeInput(Object sourceValue, List } @VisibleForTesting - Map buildMapWithTargetKeyAndOriginalValue(IngestDocument ingestDocument) { + Map buildMapWithTargetKeys(IngestDocument ingestDocument) { Map sourceAndMetadataMap = ingestDocument.getSourceAndMetadata(); Map mapWithProcessorKeys = new LinkedHashMap<>(); for (Map.Entry fieldMapEntry : fieldMap.entrySet()) { - String originalKey = fieldMapEntry.getKey(); - Object targetKey = fieldMapEntry.getValue(); + Pair processedNestedKey = processNestedKey(fieldMapEntry); + String originalKey = processedNestedKey.getKey(); + Object targetKey = processedNestedKey.getValue(); + if (targetKey instanceof Map) { Map treeRes = new LinkedHashMap<>(); - buildMapWithProcessorKeyAndOriginalValueForMapType(originalKey, targetKey, sourceAndMetadataMap, treeRes); + buildNestedMap(originalKey, targetKey, sourceAndMetadataMap, treeRes); mapWithProcessorKeys.put(originalKey, treeRes.get(originalKey)); } else { mapWithProcessorKeys.put(String.valueOf(targetKey), sourceAndMetadataMap.get(originalKey)); @@ -293,20 +298,19 @@ Map buildMapWithTargetKeyAndOriginalValue(IngestDocument ingestD return mapWithProcessorKeys; } - private void buildMapWithProcessorKeyAndOriginalValueForMapType( - String parentKey, - Object processorKey, - Map sourceAndMetadataMap, - Map treeRes - ) { - if (processorKey == null || sourceAndMetadataMap == null) return; + @VisibleForTesting + void buildNestedMap(String parentKey, Object processorKey, Map sourceAndMetadataMap, Map treeRes) { + if (Objects.isNull(processorKey) || Objects.isNull(sourceAndMetadataMap)) { + return; + } if (processorKey instanceof Map) { Map next = new LinkedHashMap<>(); if (sourceAndMetadataMap.get(parentKey) instanceof Map) { for (Map.Entry nestedFieldMapEntry : ((Map) processorKey).entrySet()) { - buildMapWithProcessorKeyAndOriginalValueForMapType( - nestedFieldMapEntry.getKey(), - nestedFieldMapEntry.getValue(), + Pair processedNestedKey = processNestedKey(nestedFieldMapEntry); + buildNestedMap( + processedNestedKey.getKey(), + processedNestedKey.getValue(), (Map) sourceAndMetadataMap.get(parentKey), next ); @@ -317,21 +321,46 @@ private void buildMapWithProcessorKeyAndOriginalValueForMapType( List listOfStrings = list.stream().map(x -> x.get(nestedFieldMapEntry.getKey())).collect(Collectors.toList()); Map map = new LinkedHashMap<>(); map.put(nestedFieldMapEntry.getKey(), listOfStrings); - buildMapWithProcessorKeyAndOriginalValueForMapType( - nestedFieldMapEntry.getKey(), - nestedFieldMapEntry.getValue(), - map, - next - ); + buildNestedMap(nestedFieldMapEntry.getKey(), nestedFieldMapEntry.getValue(), map, next); } } - treeRes.put(parentKey, next); + treeRes.merge(parentKey, next, (v1, v2) -> { + if (v1 instanceof Collection && v2 instanceof Collection) { + ((Collection) v1).addAll((Collection) v2); + return v1; + } else if (v1 instanceof Map && v2 instanceof Map) { + ((Map) v1).putAll((Map) v2); + return v1; + } else { + return v2; + } + }); } else { String key = String.valueOf(processorKey); treeRes.put(key, sourceAndMetadataMap.get(parentKey)); } } + /** + * Process the nested key, such as "a.b.c" to "a", "b.c" + * @param nestedFieldMapEntry + * @return A pair of the original key and the target key + */ + @VisibleForTesting + protected Pair processNestedKey(final Map.Entry nestedFieldMapEntry) { + String originalKey = nestedFieldMapEntry.getKey(); + Object targetKey = nestedFieldMapEntry.getValue(); + int nestedDotIndex = originalKey.indexOf('.'); + if (nestedDotIndex != -1) { + Map newTargetKey = new LinkedHashMap<>(); + newTargetKey.put(originalKey.substring(nestedDotIndex + 1), targetKey); + targetKey = newTargetKey; + + originalKey = originalKey.substring(0, nestedDotIndex); + } + return new ImmutablePair<>(originalKey, targetKey); + } + private void validateEmbeddingFieldsValue(IngestDocument ingestDocument) { Map sourceAndMetadataMap = ingestDocument.getSourceAndMetadata(); String indexName = sourceAndMetadataMap.get(IndexFieldMapper.NAME).toString(); diff --git a/src/main/java/org/opensearch/neuralsearch/query/NeuralQueryBuilder.java b/src/main/java/org/opensearch/neuralsearch/query/NeuralQueryBuilder.java index e7e081f2b..8e1b6b36b 100644 --- a/src/main/java/org/opensearch/neuralsearch/query/NeuralQueryBuilder.java +++ b/src/main/java/org/opensearch/neuralsearch/query/NeuralQueryBuilder.java @@ -5,6 +5,12 @@ package org.opensearch.neuralsearch.query; import static org.opensearch.knn.index.query.KNNQueryBuilder.FILTER_FIELD; +import static org.opensearch.knn.index.query.KNNQueryBuilder.MAX_DISTANCE_FIELD; +import static org.opensearch.knn.index.query.KNNQueryBuilder.METHOD_PARAMS_FIELD; +import static org.opensearch.knn.index.query.KNNQueryBuilder.MIN_SCORE_FIELD; +import static org.opensearch.neuralsearch.common.MinClusterVersionUtil.isClusterOnOrAfterMinReqVersion; +import static org.opensearch.neuralsearch.common.MinClusterVersionUtil.isClusterOnOrAfterMinReqVersionForDefaultModelIdSupport; +import static org.opensearch.neuralsearch.common.MinClusterVersionUtil.isClusterOnOrAfterMinReqVersionForRadialSearch; import static org.opensearch.neuralsearch.common.VectorUtil.vectorAsListToArray; import static org.opensearch.neuralsearch.processor.TextImageEmbeddingProcessor.INPUT_IMAGE; import static org.opensearch.neuralsearch.processor.TextImageEmbeddingProcessor.INPUT_TEXT; @@ -19,7 +25,6 @@ import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.lucene.search.Query; -import org.opensearch.Version; import org.opensearch.common.SetOnce; import org.opensearch.core.ParseField; import org.opensearch.core.action.ActionListener; @@ -34,8 +39,9 @@ import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.query.QueryShardContext; import org.opensearch.knn.index.query.KNNQueryBuilder; +import org.opensearch.knn.index.query.parser.MethodParametersParser; +import org.opensearch.neuralsearch.common.MinClusterVersionUtil; import org.opensearch.neuralsearch.ml.MLCommonsClientAccessor; -import org.opensearch.neuralsearch.util.NeuralSearchClusterUtil; import com.google.common.annotations.VisibleForTesting; @@ -69,18 +75,11 @@ public class NeuralQueryBuilder extends AbstractQueryBuilder @VisibleForTesting static final ParseField QUERY_IMAGE_FIELD = new ParseField("query_image"); - @VisibleForTesting - static final ParseField MODEL_ID_FIELD = new ParseField("model_id"); + public static final ParseField MODEL_ID_FIELD = new ParseField("model_id"); @VisibleForTesting static final ParseField K_FIELD = new ParseField("k"); - @VisibleForTesting - static final ParseField MAX_DISTANCE_FIELD = new ParseField("max_distance"); - - @VisibleForTesting - static final ParseField MIN_SCORE_FIELD = new ParseField("min_score"); - private static final int DEFAULT_K = 10; private static MLCommonsClientAccessor ML_CLIENT; @@ -101,8 +100,7 @@ public static void initialize(MLCommonsClientAccessor mlClient) { @Setter(AccessLevel.PACKAGE) private Supplier vectorSupplier; private QueryBuilder filter; - private static final Version MINIMAL_SUPPORTED_VERSION_DEFAULT_MODEL_ID = Version.V_2_11_0; - private static final Version MINIMAL_SUPPORTED_VERSION_RADIAL_SEARCH = Version.V_2_14_0; + private Map methodParameters; /** * Constructor from stream input @@ -130,6 +128,9 @@ public NeuralQueryBuilder(StreamInput in) throws IOException { this.maxDistance = in.readOptionalFloat(); this.minScore = in.readOptionalFloat(); } + if (isClusterOnOrAfterMinReqVersion(METHOD_PARAMS_FIELD.getPreferredName())) { + this.methodParameters = MethodParametersParser.streamInput(in, MinClusterVersionUtil::isClusterOnOrAfterMinReqVersion); + } } @Override @@ -152,6 +153,9 @@ protected void doWriteTo(StreamOutput out) throws IOException { out.writeOptionalFloat(this.maxDistance); out.writeOptionalFloat(this.minScore); } + if (isClusterOnOrAfterMinReqVersion(METHOD_PARAMS_FIELD.getPreferredName())) { + MethodParametersParser.streamOutput(out, methodParameters, MinClusterVersionUtil::isClusterOnOrAfterMinReqVersion); + } } @Override @@ -174,6 +178,9 @@ protected void doXContent(XContentBuilder xContentBuilder, Params params) throws if (Objects.nonNull(minScore)) { xContentBuilder.field(MIN_SCORE_FIELD.getPreferredName(), minScore); } + if (Objects.nonNull(methodParameters)) { + MethodParametersParser.doXContent(xContentBuilder, methodParameters); + } printBoostAndQueryName(xContentBuilder); xContentBuilder.endObject(); xContentBuilder.endObject(); @@ -267,6 +274,8 @@ private static void parseQueryParams(XContentParser parser, NeuralQueryBuilder n } else if (token == XContentParser.Token.START_OBJECT) { if (FILTER_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { neuralQueryBuilder.filter(parseInnerQueryBuilder(parser)); + } else if (METHOD_PARAMS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { + neuralQueryBuilder.methodParameters(MethodParametersParser.fromXContent(parser)); } } else { throw new ParsingException( @@ -325,7 +334,8 @@ protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) { maxDistance(), minScore(), vectorSetOnce::get, - filter() + filter(), + methodParameters() ); } @@ -358,14 +368,6 @@ public String getWriteableName() { return NAME; } - private static boolean isClusterOnOrAfterMinReqVersionForDefaultModelIdSupport() { - return NeuralSearchClusterUtil.instance().getClusterMinVersion().onOrAfter(MINIMAL_SUPPORTED_VERSION_DEFAULT_MODEL_ID); - } - - private static boolean isClusterOnOrAfterMinReqVersionForRadialSearch() { - return NeuralSearchClusterUtil.instance().getClusterMinVersion().onOrAfter(MINIMAL_SUPPORTED_VERSION_RADIAL_SEARCH); - } - private static boolean validateKNNQueryType(NeuralQueryBuilder neuralQueryBuilder) { int queryCount = 0; if (neuralQueryBuilder.k() != null) { diff --git a/src/test/java/org/opensearch/neuralsearch/processor/InferenceProcessorTestCase.java b/src/test/java/org/opensearch/neuralsearch/processor/InferenceProcessorTestCase.java index 866a2ab29..caac962e7 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/InferenceProcessorTestCase.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/InferenceProcessorTestCase.java @@ -5,6 +5,7 @@ package org.opensearch.neuralsearch.processor; import com.google.common.collect.ImmutableList; +import org.apache.commons.lang.math.RandomUtils; import org.opensearch.index.mapper.IndexFieldMapper; import org.opensearch.ingest.IngestDocument; import org.opensearch.ingest.IngestDocumentWrapper; @@ -58,4 +59,17 @@ protected List> createMockVectorResult() { modelTensorList.add(number7); return modelTensorList; } + + protected List> createRandomOneDimensionalMockVector(int numOfVectors, int vectorDimension, float min, float max) { + List> result = new ArrayList<>(); + for (int i = 0; i < numOfVectors; i++) { + List numbers = new ArrayList<>(); + for (int j = 0; j < vectorDimension; j++) { + Float nextFloat = RandomUtils.nextFloat() * (max - min) + min; + numbers.add(nextFloat); + } + result.add(numbers); + } + return result; + } } diff --git a/src/test/java/org/opensearch/neuralsearch/processor/NormalizationProcessorIT.java b/src/test/java/org/opensearch/neuralsearch/processor/NormalizationProcessorIT.java index a34863ee3..7477fe63b 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/NormalizationProcessorIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/NormalizationProcessorIT.java @@ -96,6 +96,7 @@ public void testResultProcessor_whenOneShardAndQueryMatches_thenSuccessful() { null, null, null, + null, null ); TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3); @@ -146,6 +147,7 @@ public void testResultProcessor_whenDefaultProcessorConfigAndQueryMatches_thenSu null, null, null, + null, null ); TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3); @@ -185,6 +187,7 @@ public void testQueryMatches_whenMultipleShards_thenSuccessful() { null, null, null, + null, null ); TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3); diff --git a/src/test/java/org/opensearch/neuralsearch/processor/ScoreCombinationIT.java b/src/test/java/org/opensearch/neuralsearch/processor/ScoreCombinationIT.java index 800dc6129..ad2460103 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/ScoreCombinationIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/ScoreCombinationIT.java @@ -224,7 +224,7 @@ public void testHarmonicMeanCombination_whenOneShardAndQueryMatches_thenSuccessf HybridQueryBuilder hybridQueryBuilderDefaultNorm = new HybridQueryBuilder(); hybridQueryBuilderDefaultNorm.add( - new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null) + new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null, null) ); hybridQueryBuilderDefaultNorm.add(QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3)); @@ -249,7 +249,7 @@ public void testHarmonicMeanCombination_whenOneShardAndQueryMatches_thenSuccessf HybridQueryBuilder hybridQueryBuilderL2Norm = new HybridQueryBuilder(); hybridQueryBuilderL2Norm.add( - new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null) + new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null, null) ); hybridQueryBuilderL2Norm.add(QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3)); @@ -299,7 +299,7 @@ public void testGeometricMeanCombination_whenOneShardAndQueryMatches_thenSuccess HybridQueryBuilder hybridQueryBuilderDefaultNorm = new HybridQueryBuilder(); hybridQueryBuilderDefaultNorm.add( - new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null) + new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null, null) ); hybridQueryBuilderDefaultNorm.add(QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3)); @@ -324,7 +324,7 @@ public void testGeometricMeanCombination_whenOneShardAndQueryMatches_thenSuccess HybridQueryBuilder hybridQueryBuilderL2Norm = new HybridQueryBuilder(); hybridQueryBuilderL2Norm.add( - new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null) + new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null, null) ); hybridQueryBuilderL2Norm.add(QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3)); diff --git a/src/test/java/org/opensearch/neuralsearch/processor/ScoreNormalizationIT.java b/src/test/java/org/opensearch/neuralsearch/processor/ScoreNormalizationIT.java index 9f201b4bd..7700c9f6a 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/ScoreNormalizationIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/ScoreNormalizationIT.java @@ -85,7 +85,7 @@ public void testL2Norm_whenOneShardAndQueryMatches_thenSuccessful() { HybridQueryBuilder hybridQueryBuilderArithmeticMean = new HybridQueryBuilder(); hybridQueryBuilderArithmeticMean.add( - new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null) + new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null, null) ); hybridQueryBuilderArithmeticMean.add(QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3)); @@ -110,7 +110,7 @@ public void testL2Norm_whenOneShardAndQueryMatches_thenSuccessful() { HybridQueryBuilder hybridQueryBuilderHarmonicMean = new HybridQueryBuilder(); hybridQueryBuilderHarmonicMean.add( - new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null) + new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null, null) ); hybridQueryBuilderHarmonicMean.add(QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3)); @@ -135,7 +135,7 @@ public void testL2Norm_whenOneShardAndQueryMatches_thenSuccessful() { HybridQueryBuilder hybridQueryBuilderGeometricMean = new HybridQueryBuilder(); hybridQueryBuilderGeometricMean.add( - new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null) + new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null, null) ); hybridQueryBuilderGeometricMean.add(QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3)); @@ -185,7 +185,7 @@ public void testMinMaxNorm_whenOneShardAndQueryMatches_thenSuccessful() { HybridQueryBuilder hybridQueryBuilderArithmeticMean = new HybridQueryBuilder(); hybridQueryBuilderArithmeticMean.add( - new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null) + new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null, null) ); hybridQueryBuilderArithmeticMean.add(QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3)); @@ -210,7 +210,7 @@ public void testMinMaxNorm_whenOneShardAndQueryMatches_thenSuccessful() { HybridQueryBuilder hybridQueryBuilderHarmonicMean = new HybridQueryBuilder(); hybridQueryBuilderHarmonicMean.add( - new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null) + new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null, null) ); hybridQueryBuilderHarmonicMean.add(QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3)); @@ -235,7 +235,7 @@ public void testMinMaxNorm_whenOneShardAndQueryMatches_thenSuccessful() { HybridQueryBuilder hybridQueryBuilderGeometricMean = new HybridQueryBuilder(); hybridQueryBuilderGeometricMean.add( - new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null) + new NeuralQueryBuilder(TEST_KNN_VECTOR_FIELD_NAME_1, TEST_DOC_TEXT1, "", modelId, 5, null, null, null, null, null) ); hybridQueryBuilderGeometricMean.add(QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3)); diff --git a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java index f963c48fc..98b5a25b6 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java @@ -16,21 +16,31 @@ import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.lucene.search.join.ScoreMode; import org.junit.Before; import org.opensearch.client.Response; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; import org.opensearch.neuralsearch.BaseNeuralSearchIT; import com.google.common.collect.ImmutableList; +import org.opensearch.neuralsearch.query.NeuralQueryBuilder; public class TextEmbeddingProcessorIT extends BaseNeuralSearchIT { private static final String INDEX_NAME = "text_embedding_index"; private static final String PIPELINE_NAME = "pipeline-hybrid"; + protected static final String QUERY_TEXT = "hello"; + protected static final String LEVEL_1_FIELD = "nested_passages"; + protected static final String LEVEL_2_FIELD = "level_2"; + protected static final String LEVEL_3_FIELD_TEXT = "level_3_text"; + protected static final String LEVEL_3_FIELD_EMBEDDING = "level_3_embedding"; private final String INGEST_DOC1 = Files.readString(Path.of(classLoader.getResource("processor/ingest_doc1.json").toURI())); private final String INGEST_DOC2 = Files.readString(Path.of(classLoader.getResource("processor/ingest_doc2.json").toURI())); + private final String INGEST_DOC3 = Files.readString(Path.of(classLoader.getResource("processor/ingest_doc3.json").toURI())); private final String BULK_ITEM_TEMPLATE = Files.readString( Path.of(classLoader.getResource("processor/bulk_item_template.json").toURI()) ); @@ -77,6 +87,66 @@ public void testTextEmbeddingProcessor_batch() throws Exception { } } + public void testNestedFieldMapping_whenDocumentsIngested_thenSuccessful() throws Exception { + String modelId = null; + try { + modelId = uploadTextEmbeddingModel(); + loadModel(modelId); + createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.TEXT_EMBEDDING_WITH_NESTED_FIELDS_MAPPING); + createTextEmbeddingIndex(); + ingestDocument(INGEST_DOC3, "3"); + + Map sourceMap = (Map) getDocById(INDEX_NAME, "3").get("_source"); + assertNotNull(sourceMap); + assertTrue(sourceMap.containsKey(LEVEL_1_FIELD)); + Map nestedPassages = (Map) sourceMap.get(LEVEL_1_FIELD); + assertTrue(nestedPassages.containsKey(LEVEL_2_FIELD)); + Map level2 = (Map) nestedPassages.get(LEVEL_2_FIELD); + assertEquals(QUERY_TEXT, level2.get(LEVEL_3_FIELD_TEXT)); + assertTrue(level2.containsKey(LEVEL_3_FIELD_EMBEDDING)); + List embeddings = (List) level2.get(LEVEL_3_FIELD_EMBEDDING); + assertEquals(768, embeddings.size()); + for (Double embedding : embeddings) { + assertTrue(embedding >= 0.0 && embedding <= 1.0); + } + + NeuralQueryBuilder neuralQueryBuilderQuery = new NeuralQueryBuilder( + LEVEL_1_FIELD + "." + LEVEL_2_FIELD + "." + LEVEL_3_FIELD_EMBEDDING, + QUERY_TEXT, + "", + modelId, + 10, + null, + null, + null, + null, + null + ); + QueryBuilder queryNestedLowerLevel = QueryBuilders.nestedQuery( + LEVEL_1_FIELD + "." + LEVEL_2_FIELD, + neuralQueryBuilderQuery, + ScoreMode.Total + ); + QueryBuilder queryNestedHighLevel = QueryBuilders.nestedQuery(LEVEL_1_FIELD, queryNestedLowerLevel, ScoreMode.Total); + + Map searchResponseAsMap = search(INDEX_NAME, queryNestedHighLevel, 1); + assertNotNull(searchResponseAsMap); + + Map hits = (Map) searchResponseAsMap.get("hits"); + assertNotNull(hits); + + assertEquals(1.0, hits.get("max_score")); + List> listOfHits = (List>) hits.get("hits"); + assertNotNull(listOfHits); + assertEquals(1, listOfHits.size()); + Map hitsInner = listOfHits.get(0); + assertEquals("3", hitsInner.get("_id")); + assertEquals(1.0, hitsInner.get("_score")); + } finally { + wipeOfTestResources(INDEX_NAME, PIPELINE_NAME, modelId, null); + } + } + private String uploadTextEmbeddingModel() throws Exception { String requestBody = Files.readString(Path.of(classLoader.getResource("processor/UploadModelRequestBody.json").toURI())); return registerModelGroupAndUploadModel(requestBody); diff --git a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorTests.java b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorTests.java index bff578ad7..9a5e8aa76 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorTests.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorTests.java @@ -18,14 +18,17 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Arrays; +import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; +import org.apache.commons.lang3.tuple.Pair; import org.junit.Before; import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; @@ -50,6 +53,11 @@ public class TextEmbeddingProcessorTests extends InferenceProcessorTestCase { + protected static final String PARENT_FIELD = "parent"; + protected static final String CHILD_FIELD_LEVEL_1 = "child_level1"; + protected static final String CHILD_FIELD_LEVEL_2 = "child_level2"; + protected static final String CHILD_LEVEL_2_TEXT_FIELD_VALUE = "text_field_value"; + protected static final String CHILD_LEVEL_2_KNN_FIELD = "test3_knn"; @Mock private MLCommonsClientAccessor mlCommonsClientAccessor; @@ -77,7 +85,7 @@ private TextEmbeddingProcessor createInstanceWithLevel2MapConfig() { config.put(TextEmbeddingProcessor.MODEL_ID_FIELD, "mockModelId"); config.put( TextEmbeddingProcessor.FIELD_MAP_FIELD, - ImmutableMap.of("key1", ImmutableMap.of("test1", "test1_knn"), "key2", ImmutableMap.of("test3", "test3_knn")) + ImmutableMap.of("key1", ImmutableMap.of("test1", "test1_knn"), "key2", ImmutableMap.of("test3", CHILD_LEVEL_2_KNN_FIELD)) ); return textEmbeddingProcessorFactory.create(registry, PROCESSOR_TAG, DESCRIPTION, config); } @@ -285,6 +293,94 @@ public void testExecute_withMapTypeInput_successful() { } + @SneakyThrows + public void testNestedFieldInMapping_withMapTypeInput_successful() { + Map sourceAndMetadata = new HashMap<>(); + sourceAndMetadata.put(IndexFieldMapper.NAME, "my_index"); + Map childLevel2 = new HashMap<>(); + childLevel2.put(CHILD_FIELD_LEVEL_2, CHILD_LEVEL_2_TEXT_FIELD_VALUE); + Map childLevel1 = new HashMap<>(); + childLevel1.put(CHILD_FIELD_LEVEL_1, childLevel2); + sourceAndMetadata.put(PARENT_FIELD, childLevel1); + IngestDocument ingestDocument = new IngestDocument(sourceAndMetadata, new HashMap<>()); + + Map registry = new HashMap<>(); + Map config = new HashMap<>(); + config.put(TextEmbeddingProcessor.MODEL_ID_FIELD, "mockModelId"); + config.put( + TextEmbeddingProcessor.FIELD_MAP_FIELD, + ImmutableMap.of( + String.join(".", Arrays.asList(PARENT_FIELD, CHILD_FIELD_LEVEL_1, CHILD_FIELD_LEVEL_2)), + CHILD_LEVEL_2_KNN_FIELD + ) + ); + TextEmbeddingProcessor processor = textEmbeddingProcessorFactory.create(registry, PROCESSOR_TAG, DESCRIPTION, config); + + List> modelTensorList = createRandomOneDimensionalMockVector(1, 100, 0.0f, 1.0f); + doAnswer(invocation -> { + ActionListener>> listener = invocation.getArgument(2); + listener.onResponse(modelTensorList); + return null; + }).when(mlCommonsClientAccessor).inferenceSentences(anyString(), anyList(), isA(ActionListener.class)); + + processor.execute(ingestDocument, (BiConsumer) (doc, ex) -> {}); + assertNotNull(ingestDocument); + assertNotNull(ingestDocument.getSourceAndMetadata().get(PARENT_FIELD)); + Map childLevel1AfterProcessor = (Map) ingestDocument.getSourceAndMetadata().get(PARENT_FIELD); + Map childLevel2AfterProcessor = (Map) childLevel1AfterProcessor.get(CHILD_FIELD_LEVEL_1); + assertEquals(CHILD_LEVEL_2_TEXT_FIELD_VALUE, childLevel2AfterProcessor.get(CHILD_FIELD_LEVEL_2)); + assertNotNull(childLevel2AfterProcessor.get(CHILD_LEVEL_2_KNN_FIELD)); + List vectors = (List) childLevel2AfterProcessor.get(CHILD_LEVEL_2_KNN_FIELD); + assertEquals(100, vectors.size()); + for (Float vector : vectors) { + assertTrue(vector >= 0.0f && vector <= 1.0f); + } + } + + @SneakyThrows + public void testNestedFieldInMappingMixedSyntax_withMapTypeInput_successful() { + Map sourceAndMetadata = new HashMap<>(); + sourceAndMetadata.put(IndexFieldMapper.NAME, "my_index"); + Map childLevel2 = new HashMap<>(); + childLevel2.put(CHILD_FIELD_LEVEL_2, CHILD_LEVEL_2_TEXT_FIELD_VALUE); + Map childLevel1 = new HashMap<>(); + childLevel1.put(CHILD_FIELD_LEVEL_1, childLevel2); + sourceAndMetadata.put(PARENT_FIELD, childLevel1); + IngestDocument ingestDocument = new IngestDocument(sourceAndMetadata, new HashMap<>()); + + Map registry = new HashMap<>(); + Map config = new HashMap<>(); + config.put(TextEmbeddingProcessor.MODEL_ID_FIELD, "mockModelId"); + config.put( + TextEmbeddingProcessor.FIELD_MAP_FIELD, + ImmutableMap.of( + String.join(".", Arrays.asList(PARENT_FIELD, CHILD_FIELD_LEVEL_1)), + Map.of(CHILD_FIELD_LEVEL_2, CHILD_LEVEL_2_KNN_FIELD) + ) + ); + TextEmbeddingProcessor processor = textEmbeddingProcessorFactory.create(registry, PROCESSOR_TAG, DESCRIPTION, config); + + List> modelTensorList = createRandomOneDimensionalMockVector(1, 100, 0.0f, 1.0f); + doAnswer(invocation -> { + ActionListener>> listener = invocation.getArgument(2); + listener.onResponse(modelTensorList); + return null; + }).when(mlCommonsClientAccessor).inferenceSentences(anyString(), anyList(), isA(ActionListener.class)); + + processor.execute(ingestDocument, (BiConsumer) (doc, ex) -> {}); + assertNotNull(ingestDocument); + assertNotNull(ingestDocument.getSourceAndMetadata().get(PARENT_FIELD)); + Map childLevel1AfterProcessor = (Map) ingestDocument.getSourceAndMetadata().get(PARENT_FIELD); + Map childLevel2AfterProcessor = (Map) childLevel1AfterProcessor.get(CHILD_FIELD_LEVEL_1); + assertEquals(CHILD_LEVEL_2_TEXT_FIELD_VALUE, childLevel2AfterProcessor.get(CHILD_FIELD_LEVEL_2)); + assertNotNull(childLevel2AfterProcessor.get(CHILD_LEVEL_2_KNN_FIELD)); + List vectors = (List) childLevel2AfterProcessor.get(CHILD_LEVEL_2_KNN_FIELD); + assertEquals(100, vectors.size()); + for (Float vector : vectors) { + assertTrue(vector >= 0.0f && vector <= 1.0f); + } + } + public void testExecute_mapHasNonStringValue_throwIllegalArgumentException() { Map map1 = ImmutableMap.of("test1", "test2"); Map map2 = ImmutableMap.of("test3", 209.3D); @@ -396,7 +492,7 @@ public void testProcessResponse_successful() throws Exception { IngestDocument ingestDocument = createPlainIngestDocument(); TextEmbeddingProcessor processor = createInstanceWithNestedMapConfiguration(config); - Map knnMap = processor.buildMapWithTargetKeyAndOriginalValue(ingestDocument); + Map knnMap = processor.buildMapWithTargetKeys(ingestDocument); List> modelTensorList = createMockVectorResult(); processor.setVectorFieldsToDocument(ingestDocument, knnMap, modelTensorList); @@ -409,7 +505,7 @@ public void testBuildVectorOutput_withPlainStringValue_successful() { IngestDocument ingestDocument = createPlainIngestDocument(); TextEmbeddingProcessor processor = createInstanceWithNestedMapConfiguration(config); - Map knnMap = processor.buildMapWithTargetKeyAndOriginalValue(ingestDocument); + Map knnMap = processor.buildMapWithTargetKeys(ingestDocument); // To assert the order is not changed between config map and generated map. List configValueList = new LinkedList<>(config.values()); @@ -435,23 +531,51 @@ public void testBuildVectorOutput_withNestedMap_successful() { Map config = createNestedMapConfiguration(); IngestDocument ingestDocument = createNestedMapIngestDocument(); TextEmbeddingProcessor processor = createInstanceWithNestedMapConfiguration(config); - Map knnMap = processor.buildMapWithTargetKeyAndOriginalValue(ingestDocument); - List> modelTensorList = createMockVectorResult(); + Map knnMap = processor.buildMapWithTargetKeys(ingestDocument); + List> modelTensorList = createRandomOneDimensionalMockVector(2, 100, 0.0f, 1.0f); processor.buildNLPResult(knnMap, modelTensorList, ingestDocument.getSourceAndMetadata()); + /** + * "favorites": { + * "favorite": { + * "movie": "matrix", + * "actor": "Charlie Chaplin", + * "games" : { + * "adventure": { + * "action": "overwatch", + * "rpg": "elden ring" + * } + * } + * } + * } + */ Map favoritesMap = (Map) ingestDocument.getSourceAndMetadata().get("favorites"); assertNotNull(favoritesMap); - Map favoriteGames = (Map) favoritesMap.get("favorite.games"); + Map favorites = (Map) favoritesMap.get("favorite"); + assertNotNull(favorites); + + Map favoriteGames = (Map) favorites.get("games"); assertNotNull(favoriteGames); Map adventure = (Map) favoriteGames.get("adventure"); - Object actionGamesKnn = adventure.get("with.action.knn"); - assertNotNull(actionGamesKnn); + List adventureKnnVector = (List) adventure.get("with_action_knn"); + assertNotNull(adventureKnnVector); + assertEquals(100, adventureKnnVector.size()); + for (float vector : adventureKnnVector) { + assertTrue(vector >= 0.0f && vector <= 1.0f); + } + + List favoriteKnnVector = (List) favorites.get("favorite_movie_knn"); + assertNotNull(favoriteKnnVector); + assertEquals(100, favoriteKnnVector.size()); + for (float vector : favoriteKnnVector) { + assertTrue(vector >= 0.0f && vector <= 1.0f); + } } public void testBuildVectorOutput_withNestedList_successful() { Map config = createNestedListConfiguration(); IngestDocument ingestDocument = createNestedListIngestDocument(); TextEmbeddingProcessor textEmbeddingProcessor = createInstanceWithNestedMapConfiguration(config); - Map knnMap = textEmbeddingProcessor.buildMapWithTargetKeyAndOriginalValue(ingestDocument); + Map knnMap = textEmbeddingProcessor.buildMapWithTargetKeys(ingestDocument); List> modelTensorList = createMockVectorResult(); textEmbeddingProcessor.buildNLPResult(knnMap, modelTensorList, ingestDocument.getSourceAndMetadata()); List> nestedObj = (List>) ingestDocument.getSourceAndMetadata().get("nestedField"); @@ -465,7 +589,7 @@ public void testBuildVectorOutput_withNestedList_Level2_successful() { Map config = createNestedList2LevelConfiguration(); IngestDocument ingestDocument = create2LevelNestedListIngestDocument(); TextEmbeddingProcessor textEmbeddingProcessor = createInstanceWithNestedMapConfiguration(config); - Map knnMap = textEmbeddingProcessor.buildMapWithTargetKeyAndOriginalValue(ingestDocument); + Map knnMap = textEmbeddingProcessor.buildMapWithTargetKeys(ingestDocument); List> modelTensorList = createMockVectorResult(); textEmbeddingProcessor.buildNLPResult(knnMap, modelTensorList, ingestDocument.getSourceAndMetadata()); Map nestedLevel1 = (Map) ingestDocument.getSourceAndMetadata().get("nestedField"); @@ -480,7 +604,7 @@ public void test_updateDocument_appendVectorFieldsToDocument_successful() { Map config = createPlainStringConfiguration(); IngestDocument ingestDocument = createPlainIngestDocument(); TextEmbeddingProcessor processor = createInstanceWithNestedMapConfiguration(config); - Map knnMap = processor.buildMapWithTargetKeyAndOriginalValue(ingestDocument); + Map knnMap = processor.buildMapWithTargetKeys(ingestDocument); List> modelTensorList = createMockVectorResult(); processor.setVectorFieldsToDocument(ingestDocument, knnMap, modelTensorList); @@ -556,6 +680,100 @@ public void test_batchExecute_exception() { } } + public void testParsingNestedField_whenNestedFieldsConfigured_thenSuccessful() { + Map config = createNestedMapConfiguration(); + TextEmbeddingProcessor processor = createInstanceWithNestedMapConfiguration(config); + /** + * Assert that mapping + * "favorites": { + * "favorite.movie": "favorite_movie_knn", + * "favorite.games": { + * "adventure.action": "with_action_knn" + * } + * } + * has been transformed to structure: + * "favorites": { + * "favorite": { + * "movie": "favorite_movie_knn", + * "games": { + * "adventure": { + * "action": "with_action_knn" + * } + * } + * } + * } + */ + assertMapWithNestedFields( + processor.processNestedKey( + config.entrySet().stream().filter(entry -> entry.getKey().equals("favorites")).findAny().orElseThrow() + ), + List.of("favorites"), + Optional.empty() + ); + + Map favorites = (Map) config.get("favorites"); + + assertMapWithNestedFields( + processor.processNestedKey( + favorites.entrySet().stream().filter(entry -> entry.getKey().equals("favorite.games")).findAny().orElseThrow() + ), + List.of("favorite", "games"), + Optional.of("favorite_movie_knn") + ); + + assertMapWithNestedFields( + processor.processNestedKey( + favorites.entrySet().stream().filter(entry -> entry.getKey().equals("favorite.movie")).findAny().orElseThrow() + ), + List.of("favorite", "movie"), + Optional.empty() + ); + + Map adventureActionMap = (Map) favorites.get("favorite.games"); + assertMapWithNestedFields( + processor.processNestedKey( + adventureActionMap.entrySet().stream().filter(entry -> entry.getKey().equals("adventure.action")).findAny().orElseThrow() + ), + List.of("adventure", "action"), + Optional.of("with_action_knn") + ); + } + + public void testBuildingOfNestedMap_whenHasNestedMapping_thenSuccessful() { + /** + * assert based on following structure: + * "nestedField": { + * "nestedField": { + * "textField": "vectorField" + * } + * } + */ + Map config = createNestedList2LevelConfiguration(); + TextEmbeddingProcessor processor = createInstanceWithNestedMapConfiguration(config); + Map resultAsTree = new LinkedHashMap<>(); + processor.buildNestedMap("nestedField", config.get("nestedField"), config, resultAsTree); + assertNotNull(resultAsTree); + Map actualMapLevel1 = (Map) resultAsTree.get("nestedField"); + assertEquals(1, actualMapLevel1.size()); + assertEquals(Map.of("vectorField", "vectorField"), actualMapLevel1.get("nestedField")); + } + + private void assertMapWithNestedFields(Pair actual, List expectedKeys, Optional expectedFinalValue) { + assertNotNull(actual); + assertEquals(expectedKeys.get(0), actual.getKey()); + Map actualValue = (Map) actual.getValue(); + for (int i = 1; i < expectedKeys.size(); i++) { + assertTrue(actualValue.containsKey(expectedKeys.get(i))); + if (actualValue.get(expectedKeys.get(i)) instanceof Map) { + actualValue = (Map) actualValue.get(expectedKeys.get(i)); + } else if (expectedFinalValue.isPresent()) { + assertEquals(expectedFinalValue.get(), actualValue.get(expectedKeys.get(i))); + } else { + break; + } + } + } + @SneakyThrows private TextEmbeddingProcessor createInstanceWithNestedMapConfiguration(Map fieldMap) { Map registry = new HashMap<>(); @@ -576,20 +794,21 @@ private Map createPlainStringConfiguration() { return config; } + /** + * Create following mapping + * "favorites": { + * "favorite.movie": "favorite_movie_knn", + * "favorite.games": { + * "adventure.action": "with_action_knn" + * } + * } + */ private Map createNestedMapConfiguration() { Map adventureGames = new HashMap<>(); - adventureGames.put("with.action", "with.action.knn"); - adventureGames.put("with.reaction", "with.reaction.knn"); - Map puzzleGames = new HashMap<>(); - puzzleGames.put("maze", "maze.knn"); - puzzleGames.put("card", "card.knn"); - Map favoriteGames = new HashMap<>(); - favoriteGames.put("adventure", adventureGames); - favoriteGames.put("puzzle", puzzleGames); + adventureGames.put("adventure.action", "with_action_knn"); Map favorite = new HashMap<>(); - favorite.put("favorite.movie", "favorite.movie.knn"); - favorite.put("favorite.games", favoriteGames); - favorite.put("favorite.songs", "favorite.songs.knn"); + favorite.put("favorite.movie", "favorite_movie_knn"); + favorite.put("favorite.games", adventureGames); Map result = new HashMap<>(); result.put("favorites", favorite); return result; @@ -606,23 +825,33 @@ private IngestDocument createPlainIngestDocument() { return new IngestDocument(result, new HashMap<>()); } + /** + * Create following document + * "favorites": { + * "favorite": { + * "movie": "matrix", + * "actor": "Charlie Chaplin", + * "games" : { + * "adventure": { + * "action": "overwatch", + * "rpg": "elden ring" + * } + * } + * } + * } + */ private IngestDocument createNestedMapIngestDocument() { Map adventureGames = new HashMap<>(); - List actionGames = new ArrayList<>(); - actionGames.add("jojo world"); - actionGames.add(null); - adventureGames.put("with.action", actionGames); - adventureGames.put("with.reaction", "overwatch"); - Map puzzleGames = new HashMap<>(); - puzzleGames.put("maze", "zelda"); - puzzleGames.put("card", "hearthstone"); - Map favoriteGames = new HashMap<>(); - favoriteGames.put("adventure", adventureGames); - favoriteGames.put("puzzle", puzzleGames); + adventureGames.put("action", "overwatch"); + adventureGames.put("rpg", "elden ring"); + Map favGames = new HashMap<>(); + favGames.put("adventure", adventureGames); + Map favorites = new HashMap<>(); + favorites.put("movie", "matrix"); + favorites.put("games", favGames); + favorites.put("actor", "Charlie Chaplin"); Map favorite = new HashMap<>(); - favorite.put("favorite.movie", "favorite.movie.knn"); - favorite.put("favorite.games", favoriteGames); - favorite.put("favorite.songs", "In The Name Of Father"); + favorite.put("favorite", favorites); Map result = new HashMap<>(); result.put("favorites", favorite); return new IngestDocument(result, new HashMap<>()); diff --git a/src/test/java/org/opensearch/neuralsearch/query/NeuralQueryBuilderTests.java b/src/test/java/org/opensearch/neuralsearch/query/NeuralQueryBuilderTests.java index f3c763764..9ecb93b81 100644 --- a/src/test/java/org/opensearch/neuralsearch/query/NeuralQueryBuilderTests.java +++ b/src/test/java/org/opensearch/neuralsearch/query/NeuralQueryBuilderTests.java @@ -12,10 +12,10 @@ import static org.opensearch.index.query.AbstractQueryBuilder.BOOST_FIELD; import static org.opensearch.index.query.AbstractQueryBuilder.NAME_FIELD; import static org.opensearch.knn.index.query.KNNQueryBuilder.FILTER_FIELD; +import static org.opensearch.knn.index.query.KNNQueryBuilder.MAX_DISTANCE_FIELD; +import static org.opensearch.knn.index.query.KNNQueryBuilder.MIN_SCORE_FIELD; import static org.opensearch.neuralsearch.util.TestUtils.xContentBuilderToMap; import static org.opensearch.neuralsearch.query.NeuralQueryBuilder.K_FIELD; -import static org.opensearch.neuralsearch.query.NeuralQueryBuilder.MAX_DISTANCE_FIELD; -import static org.opensearch.neuralsearch.query.NeuralQueryBuilder.MIN_SCORE_FIELD; import static org.opensearch.neuralsearch.query.NeuralQueryBuilder.MODEL_ID_FIELD; import static org.opensearch.neuralsearch.query.NeuralQueryBuilder.NAME; import static org.opensearch.neuralsearch.query.NeuralQueryBuilder.QUERY_IMAGE_FIELD; @@ -107,6 +107,41 @@ public void testFromXContent_whenBuiltWithDefaults_thenBuildSuccessfully() { assertEquals(K, neuralQueryBuilder.k()); } + @SneakyThrows + public void testFromXContent_withMethodParameters_thenBuildSuccessfully() { + /* + { + "VECTOR_FIELD": { + "query_text": "string", + "query_image": "string", + "model_id": "string", + "k": int + } + } + */ + setUpClusterService(Version.V_2_10_0); + XContentBuilder xContentBuilder = XContentFactory.jsonBuilder() + .startObject() + .startObject(FIELD_NAME) + .field(QUERY_TEXT_FIELD.getPreferredName(), QUERY_TEXT) + .field(MODEL_ID_FIELD.getPreferredName(), MODEL_ID) + .field(K_FIELD.getPreferredName(), K) + .startObject("method_parameters") + .field("ef_search", 1000) + .endObject() + .endObject() + .endObject(); + + XContentParser contentParser = createParser(xContentBuilder); + contentParser.nextToken(); + NeuralQueryBuilder neuralQueryBuilder = NeuralQueryBuilder.fromXContent(contentParser); + + assertEquals(FIELD_NAME, neuralQueryBuilder.fieldName()); + assertEquals(QUERY_TEXT, neuralQueryBuilder.queryText()); + assertEquals(MODEL_ID, neuralQueryBuilder.modelId()); + assertEquals(K, neuralQueryBuilder.k()); + } + @SneakyThrows public void testFromXContent_whenBuiltWithOptionals_thenBuildSuccessfully() { /* diff --git a/src/test/java/org/opensearch/neuralsearch/query/NeuralQueryIT.java b/src/test/java/org/opensearch/neuralsearch/query/NeuralQueryIT.java index b17f7f151..0e5d86e72 100644 --- a/src/test/java/org/opensearch/neuralsearch/query/NeuralQueryIT.java +++ b/src/test/java/org/opensearch/neuralsearch/query/NeuralQueryIT.java @@ -110,6 +110,7 @@ public void testQueryWithBoostAndImageQueryAndRadialQuery() { null, null, null, + null, null ); @@ -131,7 +132,8 @@ public void testQueryWithBoostAndImageQueryAndRadialQuery() { null, null, null, - null + null, + Map.of("ef_search", 10) ); Map searchResponseAsMapMultimodalQuery = search(TEST_BASIC_INDEX_NAME, neuralQueryBuilderMultimodalQuery, 1); Map firstInnerHitMultimodalQuery = getFirstInnerHit(searchResponseAsMapMultimodalQuery); @@ -157,6 +159,7 @@ public void testQueryWithBoostAndImageQueryAndRadialQuery() { 100.0f, null, null, + null, null ); @@ -185,6 +188,7 @@ public void testQueryWithBoostAndImageQueryAndRadialQuery() { null, 0.01f, null, + null, null ); @@ -239,6 +243,7 @@ public void testRescoreQuery() { null, null, null, + null, null ); @@ -316,6 +321,7 @@ public void testBooleanQuery_withMultipleNeuralQueries() { null, null, null, + null, null ); NeuralQueryBuilder neuralQueryBuilder2 = new NeuralQueryBuilder( @@ -327,6 +333,7 @@ public void testBooleanQuery_withMultipleNeuralQueries() { null, null, null, + null, null ); @@ -354,6 +361,7 @@ public void testBooleanQuery_withMultipleNeuralQueries() { null, null, null, + null, null ); @@ -409,6 +417,7 @@ public void testNestedQuery() { null, null, null, + null, null ); @@ -459,7 +468,8 @@ public void testFilterQuery() { null, null, null, - new MatchQueryBuilder("_id", "3") + new MatchQueryBuilder("_id", "3"), + null ); Map searchResponseAsMap = search(TEST_MULTI_DOC_INDEX_NAME, neuralQueryBuilder, 3); assertEquals(1, getHitCount(searchResponseAsMap)); diff --git a/src/test/resources/processor/IndexMappings.json b/src/test/resources/processor/IndexMappings.json index ffa5cea64..79eb34ce4 100644 --- a/src/test/resources/processor/IndexMappings.json +++ b/src/test/resources/processor/IndexMappings.json @@ -102,6 +102,27 @@ "m": 24 } } + }, + "level_2": { + "type": "nested", + "properties": { + "level_3_text": { + "type": "text" + }, + "level_3_embedding": { + "type": "knn_vector", + "dimension": 768, + "method": { + "name": "hnsw", + "space_type": "l2", + "engine": "lucene", + "parameters": { + "ef_construction": 128, + "m": 24 + } + } + } + } } } } diff --git a/src/test/resources/processor/PipelineConfigurationWithNestedFieldsMapping.json b/src/test/resources/processor/PipelineConfigurationWithNestedFieldsMapping.json new file mode 100644 index 000000000..13bae8776 --- /dev/null +++ b/src/test/resources/processor/PipelineConfigurationWithNestedFieldsMapping.json @@ -0,0 +1,19 @@ +{ + "description": "text embedding pipeline for hybrid", + "processors": [ + { + "text_embedding": { + "model_id": "%s", + "field_map": { + "title": "title_knn", + "favor_list": "favor_list_knn", + "favorites": { + "game": "game_knn", + "movie": "movie_knn" + }, + "nested_passages.level_2.level_3_text": "level_3_embedding" + } + } + } + ] +} diff --git a/src/test/resources/processor/ingest_doc3.json b/src/test/resources/processor/ingest_doc3.json new file mode 100644 index 000000000..8eae12fe2 --- /dev/null +++ b/src/test/resources/processor/ingest_doc3.json @@ -0,0 +1,20 @@ +{ + "title": "This is a good day", + "description": "daily logging", + "favor_list": [ + "test", + "hello", + "mock" + ], + "favorites": { + "game": "overwatch", + "movie": null + }, + "nested_passages": + { + "level_2": + { + "level_3_text": "hello" + } + } +} diff --git a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java index 5836db9ff..3841e6dd0 100644 --- a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java +++ b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java @@ -19,6 +19,7 @@ import java.util.Set; import java.util.ArrayList; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -81,7 +82,9 @@ public abstract class BaseNeuralSearchIT extends OpenSearchSecureRestTestCase { ProcessorType.SPARSE_ENCODING, "processor/SparseEncodingPipelineConfiguration.json", ProcessorType.TEXT_IMAGE_EMBEDDING, - "processor/PipelineForTextImageEmbeddingProcessorConfiguration.json" + "processor/PipelineForTextImageEmbeddingProcessorConfiguration.json", + ProcessorType.TEXT_EMBEDDING_WITH_NESTED_FIELDS_MAPPING, + "processor/PipelineConfigurationWithNestedFieldsMapping.json" ); private static final Set SUCCESS_STATUSES = Set.of(RestStatus.CREATED, RestStatus.OK); protected static final String CONCURRENT_SEGMENT_SEARCH_ENABLED = "search.concurrent_segment_search.enabled"; @@ -744,6 +747,36 @@ protected void addSparseEncodingDoc( assertEquals(request.getEndpoint() + ": failed", RestStatus.CREATED, RestStatus.fromCode(response.getStatusLine().getStatusCode())); } + protected void bulkAddDocuments( + final String index, + final String textField, + final String pipeline, + final List> docs, + final int batchSize + ) throws IOException, ParseException { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < docs.size(); ++i) { + String doc = String.format( + Locale.ROOT, + "{ \"index\": { \"_index\": \"%s\", \"_id\": \"%s\" } },\n" + "{ \"%s\": \"%s\"}", + index, + docs.get(i).get("id"), + textField, + docs.get(i).get("text") + ); + builder.append(doc); + builder.append("\n"); + } + Request request = new Request( + "POST", + String.format(Locale.ROOT, "/_bulk?refresh=true&pipeline=%s&batch_size=%d", pipeline, batchSize) + ); + request.setJsonEntity(builder.toString()); + + Response response = client().performRequest(request); + assertEquals(request.getEndpoint() + ": failed", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + } + /** * Parse the first returned hit from a search response as a map * @@ -1308,12 +1341,34 @@ protected void wipeOfTestResources( } } + protected Object validateDocCountAndInfo( + String indexName, + int expectedDocCount, + Supplier> documentSupplier, + final String field, + final Class valueType + ) { + int count = getDocCount(indexName); + assertEquals(expectedDocCount, count); + Map document = documentSupplier.get(); + assertNotNull(document); + Object documentSource = document.get("_source"); + assertTrue(documentSource instanceof Map); + @SuppressWarnings("unchecked") + Map documentSourceMap = (Map) documentSource; + assertTrue(documentSourceMap.containsKey(field)); + Object outputs = documentSourceMap.get(field); + assertTrue(valueType.isAssignableFrom(outputs.getClass())); + return outputs; + } + /** * Enumeration for types of pipeline processors, used to lookup resources like create * processor request as those are type specific */ protected enum ProcessorType { TEXT_EMBEDDING, + TEXT_EMBEDDING_WITH_NESTED_FIELDS_MAPPING, TEXT_IMAGE_EMBEDDING, SPARSE_ENCODING } diff --git a/src/testFixtures/java/org/opensearch/neuralsearch/util/BatchIngestionUtils.java b/src/testFixtures/java/org/opensearch/neuralsearch/util/BatchIngestionUtils.java new file mode 100644 index 000000000..ed12d864b --- /dev/null +++ b/src/testFixtures/java/org/opensearch/neuralsearch/util/BatchIngestionUtils.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.util; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A helper class to build docs for bulk request which is used by batch ingestion tests. + */ +public class BatchIngestionUtils { + private static final List TEXTS = Arrays.asList( + "hello", + "world", + "an apple", + "find me", + "birdy", + "flying piggy", + "newspaper", + "dynamic programming", + "random text", + "finally" + ); + + public static List> prepareDataForBulkIngestion(int startId, int count) { + List> docs = new ArrayList<>(); + for (int i = startId; i < startId + count; ++i) { + Map params = new HashMap<>(); + params.put("id", Integer.toString(i)); + params.put("text", TEXTS.get(i % TEXTS.size())); + docs.add(params); + } + return docs; + } +}