From 2b21110be2d343e83f539e75104a8928522bf720 Mon Sep 17 00:00:00 2001 From: conggguan <157357330+conggguan@users.noreply.github.com> Date: Tue, 4 Jun 2024 08:58:03 +0800 Subject: [PATCH] [Feature] Neural Sparse Query Two Phase Search pipeline (#747) * Poc of pipeline Signed-off-by: conggguan * Complete some settings for two phase pipeline. Signed-off-by: conggguan * Change the implement of two-phase from QueryBuilderVistor to custom process funciton. Signed-off-by: conggguan * Add It and fix some bug on the state of multy same neuralsparsequerybuilder. Signed-off-by: conggguan * Simplify some logic, and correct some format. Signed-off-by: conggguan * Optimize some format. Signed-off-by: conggguan * Add some test case. Signed-off-by: conggguan * Optimize some logic for zhichao-aws's comments. Signed-off-by: conggguan * Optimize a line without application. Signed-off-by: conggguan * Add some comments, remove some redundant lines, fix some format. Signed-off-by: conggguan * Remove a redundant null check, fix a if format. Signed-off-by: conggguan * Fix a typo for a comment, camelcase format for some variable. Signed-off-by: conggguan * Add some comments to illustrate the influence of the modify on 2-phase search pipeline to neural sparse query builder. Signed-off-by: conggguan --------- Signed-off-by: conggguan Signed-off-by: conggguan <157357330+conggguan@users.noreply.github.com> --- CHANGELOG.md | 1 + .../neuralsearch/plugin/NeuralSearch.java | 8 +- .../NeuralSparseTwoPhaseProcessor.java | 264 ++++++++ .../query/NeuralSparseQueryBuilder.java | 148 +++-- .../plugin/NeuralSearchTests.java | 2 + .../NeuralSparseTwoPhaseProcessorIT.java | 588 ++++++++++++++++++ .../NeuralSparseTwoPhaseProcessorTests.java | 202 ++++++ .../query/NeuralSparseQueryBuilderTests.java | 16 + ...lSparseTwoPhaseProcessorConfiguration.json | 16 + .../neuralsearch/BaseNeuralSearchIT.java | 30 + 10 files changed, 1234 insertions(+), 41 deletions(-) create mode 100644 src/main/java/org/opensearch/neuralsearch/processor/NeuralSparseTwoPhaseProcessor.java create mode 100644 src/test/java/org/opensearch/neuralsearch/processor/NeuralSparseTwoPhaseProcessorIT.java create mode 100644 src/test/java/org/opensearch/neuralsearch/processor/NeuralSparseTwoPhaseProcessorTests.java create mode 100644 src/test/resources/processor/NeuralSparseTwoPhaseProcessorConfiguration.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 452f3664c..3d7b9c65f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x](https://github.com/opensearch-project/neural-search/compare/2.14...2.x) ### Features +- Speed up NeuralSparseQuery by two-phase using a custom search pipeline.([#646](https://github.com/opensearch-project/neural-search/issues/646)) - Support batchExecute in TextEmbeddingProcessor and SparseEncodingProcessor ([#743](https://github.com/opensearch-project/neural-search/issues/743)) ### Enhancements - Pass empty doc collector instead of top docs collector to improve hybrid query latencies by 20% ([#731](https://github.com/opensearch-project/neural-search/pull/731)) diff --git a/src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java b/src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java index 5e2b1fd61..a8ce31e0d 100644 --- a/src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java +++ b/src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java @@ -27,6 +27,7 @@ import org.opensearch.ml.client.MachineLearningNodeClient; import org.opensearch.neuralsearch.ml.MLCommonsClientAccessor; import org.opensearch.neuralsearch.processor.NeuralQueryEnricherProcessor; +import org.opensearch.neuralsearch.processor.NeuralSparseTwoPhaseProcessor; import org.opensearch.neuralsearch.processor.NormalizationProcessor; import org.opensearch.neuralsearch.processor.NormalizationProcessorWorkflow; import org.opensearch.neuralsearch.processor.SparseEncodingProcessor; @@ -157,7 +158,12 @@ public List> getSettings() { public Map> getRequestProcessors( Parameters parameters ) { - return Map.of(NeuralQueryEnricherProcessor.TYPE, new NeuralQueryEnricherProcessor.Factory()); + return Map.of( + NeuralQueryEnricherProcessor.TYPE, + new NeuralQueryEnricherProcessor.Factory(), + NeuralSparseTwoPhaseProcessor.TYPE, + new NeuralSparseTwoPhaseProcessor.Factory() + ); } @Override diff --git a/src/main/java/org/opensearch/neuralsearch/processor/NeuralSparseTwoPhaseProcessor.java b/src/main/java/org/opensearch/neuralsearch/processor/NeuralSparseTwoPhaseProcessor.java new file mode 100644 index 000000000..8d386e615 --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/processor/NeuralSparseTwoPhaseProcessor.java @@ -0,0 +1,264 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.processor; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import lombok.Getter; +import lombok.Setter; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.common.collect.Tuple; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.ingest.ConfigurationUtils; +import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.pipeline.AbstractProcessor; +import org.opensearch.search.pipeline.Processor; +import org.opensearch.search.pipeline.SearchRequestProcessor; +import org.opensearch.search.rescore.QueryRescorerBuilder; +import org.opensearch.search.rescore.RescorerBuilder; + +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * A SearchRequestProcessor to generate two-phase NeuralSparseQueryBuilder, + * and add it to the Rescore of a searchRequest. + */ +@Setter +@Getter +public class NeuralSparseTwoPhaseProcessor extends AbstractProcessor implements SearchRequestProcessor { + + public static final String TYPE = "neural_sparse_two_phase_processor"; + private boolean enabled; + private float ratio; + private float windowExpansion; + private int maxWindowSize; + private static final String PARAMETER_KEY = "two_phase_parameter"; + private static final String RATIO_KEY = "prune_ratio"; + private static final String ENABLE_KEY = "enabled"; + private static final String EXPANSION_KEY = "expansion_rate"; + private static final String MAX_WINDOW_SIZE_KEY = "max_window_size"; + private static final boolean DEFAULT_ENABLED = true; + private static final float DEFAULT_RATIO = 0.4f; + private static final float DEFAULT_WINDOW_EXPANSION = 5.0f; + private static final int DEFAULT_MAX_WINDOW_SIZE = 10000; + private static final int DEFAULT_BASE_QUERY_SIZE = 10; + private static final int MAX_WINDOWS_SIZE_LOWER_BOUND = 50; + private static final float WINDOW_EXPANSION_LOWER_BOUND = 1.0f; + private static final float RATIO_LOWER_BOUND = 0f; + private static final float RATIO_UPPER_BOUND = 1f; + + protected NeuralSparseTwoPhaseProcessor( + String tag, + String description, + boolean ignoreFailure, + boolean enabled, + float ratio, + float windowExpansion, + int maxWindowSize + ) { + super(tag, description, ignoreFailure); + this.enabled = enabled; + if (ratio < RATIO_LOWER_BOUND || ratio > RATIO_UPPER_BOUND) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "The two_phase_parameter.prune_ratio must be within [0, 1]. Received: %f", ratio) + ); + } + this.ratio = ratio; + if (windowExpansion < WINDOW_EXPANSION_LOWER_BOUND) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "The two_phase_parameter.expansion_rate must >= 1.0. Received: %f", windowExpansion) + ); + } + this.windowExpansion = windowExpansion; + if (maxWindowSize < MAX_WINDOWS_SIZE_LOWER_BOUND) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "The two_phase_parameter.max_window_size must >= 50. Received: %n" + maxWindowSize) + ); + } + this.maxWindowSize = maxWindowSize; + } + + /** + * Process the search request of neural_sparse_two_phase_processor + * @param request the search request (which may have been modified by an earlier processor) + * @return request the search request that add the two-phase rescore query of neural sparse query. + */ + @Override + public SearchRequest processRequest(final SearchRequest request) { + if (!enabled || ratio == 0f) { + return request; + } + QueryBuilder queryBuilder = request.source().query(); + // Collect the nested NeuralSparseQueryBuilder in the whole query. + Multimap queryBuilderMap; + queryBuilderMap = collectNeuralSparseQueryBuilder(queryBuilder, 1.0f); + if (queryBuilderMap.isEmpty()) { + return request; + } + // Make a nestedQueryBuilder which includes all the two-phase QueryBuilder. + QueryBuilder nestedTwoPhaseQueryBuilder = getNestedQueryBuilderFromNeuralSparseQueryBuilderMap(queryBuilderMap); + nestedTwoPhaseQueryBuilder.boost(getOriginQueryWeightAfterRescore(request.source())); + // Add it to the rescorer. + RescorerBuilder twoPhaseRescorer = buildRescoreQueryBuilderForTwoPhase(nestedTwoPhaseQueryBuilder, request); + request.source().addRescorer(twoPhaseRescorer); + return request; + } + + @Override + public String getType() { + return TYPE; + } + + /** + * Based on ratio, split a Map into two map by the value. + * + * @param queryTokens the queryTokens map, key is the token String, value is the score. + * @param thresholdRatio The ratio that control how tokens map be split. + * @return A tuple has two element, { token map whose value above threshold, token map whose value below threshold } + */ + public static Tuple, Map> splitQueryTokensByRatioedMaxScoreAsThreshold( + final Map queryTokens, + final float thresholdRatio + ) { + if (Objects.isNull(queryTokens)) { + throw new IllegalArgumentException("Query tokens cannot be null or empty."); + } + float max = 0f; + for (Float value : queryTokens.values()) { + max = Math.max(value, max); + } + float threshold = max * thresholdRatio; + + Map> queryTokensByScore = queryTokens.entrySet() + .stream() + .collect( + Collectors.partitioningBy(entry -> entry.getValue() >= threshold, Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + ); + + Map highScoreTokens = queryTokensByScore.get(Boolean.TRUE); + Map lowScoreTokens = queryTokensByScore.get(Boolean.FALSE); + if (Objects.isNull(highScoreTokens)) { + highScoreTokens = Collections.emptyMap(); + } + if (Objects.isNull(lowScoreTokens)) { + lowScoreTokens = Collections.emptyMap(); + } + return Tuple.tuple(highScoreTokens, lowScoreTokens); + } + + private QueryBuilder getNestedQueryBuilderFromNeuralSparseQueryBuilderMap( + final Multimap queryBuilderFloatMap + ) { + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + queryBuilderFloatMap.asMap().forEach((neuralSparseQueryBuilder, boosts) -> { + float reduceBoost = boosts.stream().reduce(0.0f, Float::sum); + boolQueryBuilder.should(neuralSparseQueryBuilder.boost(reduceBoost)); + }); + return boolQueryBuilder; + } + + private float getOriginQueryWeightAfterRescore(final SearchSourceBuilder searchSourceBuilder) { + if (Objects.isNull(searchSourceBuilder.rescores())) { + return 1.0f; + } + return searchSourceBuilder.rescores() + .stream() + .map(rescorerBuilder -> ((QueryRescorerBuilder) rescorerBuilder).getQueryWeight()) + .reduce(1.0f, (a, b) -> a * b); + } + + private Multimap collectNeuralSparseQueryBuilder(final QueryBuilder queryBuilder, float baseBoost) { + Multimap result = ArrayListMultimap.create(); + + if (queryBuilder instanceof BoolQueryBuilder) { + BoolQueryBuilder boolQueryBuilder = (BoolQueryBuilder) queryBuilder; + float updatedBoost = baseBoost * boolQueryBuilder.boost(); + for (QueryBuilder subQuery : boolQueryBuilder.should()) { + Multimap subResult = collectNeuralSparseQueryBuilder(subQuery, updatedBoost); + result.putAll(subResult); + } + } else if (queryBuilder instanceof NeuralSparseQueryBuilder) { + NeuralSparseQueryBuilder neuralSparseQueryBuilder = (NeuralSparseQueryBuilder) queryBuilder; + float updatedBoost = baseBoost * neuralSparseQueryBuilder.boost(); + /* + * We obtain a copied modifiedQueryBuilder from the valid origin NeuralSparseQueryBuilder. After this, + * when the original NeuralSparseQueryBuilder starts to rewrite, it will only retain the tokens that + * have higher scores (controlled by the maxScore * ratio). The tokens with lower scores will be + * passed to the modifiedQueryBuilder's queryTokenSupplier. + * + * By doing this, we reduce the score computation time for the original NeuralSparseQueryBuilder, + * and use the modifiedQueryBuilder to make a score increment on TopDocs. + * + * When 2-phase is enabled: + * - Docs besides TopDocs: Score = HighScoreToken's score + * - Final TopDocs: Score = HighScoreToken's score + LowScoreToken's score + */ + NeuralSparseQueryBuilder modifiedQueryBuilder = neuralSparseQueryBuilder.getCopyNeuralSparseQueryBuilderForTwoPhase(ratio); + result.put(modifiedQueryBuilder, updatedBoost); + } + // We only support BoostQuery, BooleanQuery and NeuralSparseQuery now. For other compound query type which are not support now, will + // do nothing and just quit. + return result; + } + + private RescorerBuilder buildRescoreQueryBuilderForTwoPhase( + final QueryBuilder nestedTwoPhaseQueryBuilder, + final SearchRequest searchRequest + ) { + RescorerBuilder twoPhaseRescorer = new QueryRescorerBuilder(nestedTwoPhaseQueryBuilder); + int requestSize = searchRequest.source().size(); + int windowSize = (int) ((requestSize == -1 ? DEFAULT_BASE_QUERY_SIZE : requestSize) * windowExpansion); + if (windowSize > maxWindowSize || windowSize < 0) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "The two-phase window size of neural_sparse_two_phase_processor should be [0,%d], but get the value of %d", + maxWindowSize, + windowSize + ) + ); + } + twoPhaseRescorer.windowSize(windowSize); + return twoPhaseRescorer; + } + + /** + * Factory to create NeuralSparseTwoPhaseProcessor, provide default parameter, + * + */ + public static class Factory implements Processor.Factory { + @Override + public NeuralSparseTwoPhaseProcessor create( + Map> processorFactories, + String tag, + String description, + boolean ignoreFailure, + Map config, + PipelineContext pipelineContext + ) throws IllegalArgumentException { + + boolean enabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, ENABLE_KEY, DEFAULT_ENABLED); + Map twoPhaseConfigMap = ConfigurationUtils.readOptionalMap(TYPE, tag, config, PARAMETER_KEY); + + float ratio = DEFAULT_RATIO; + float windowExpansion = DEFAULT_WINDOW_EXPANSION; + int maxWindowSize = DEFAULT_MAX_WINDOW_SIZE; + if (Objects.nonNull(twoPhaseConfigMap)) { + ratio = ((Number) twoPhaseConfigMap.getOrDefault(RATIO_KEY, ratio)).floatValue(); + windowExpansion = ((Number) twoPhaseConfigMap.getOrDefault(EXPANSION_KEY, windowExpansion)).floatValue(); + maxWindowSize = ((Number) twoPhaseConfigMap.getOrDefault(MAX_WINDOW_SIZE_KEY, maxWindowSize)).intValue(); + } + + return new NeuralSparseTwoPhaseProcessor(tag, description, ignoreFailure, enabled, ratio, windowExpansion, maxWindowSize); + } + } + +} diff --git a/src/main/java/org/opensearch/neuralsearch/query/NeuralSparseQueryBuilder.java b/src/main/java/org/opensearch/neuralsearch/query/NeuralSparseQueryBuilder.java index 6c3b06967..f46997d5e 100644 --- a/src/main/java/org/opensearch/neuralsearch/query/NeuralSparseQueryBuilder.java +++ b/src/main/java/org/opensearch/neuralsearch/query/NeuralSparseQueryBuilder.java @@ -10,6 +10,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.function.BiConsumer; import java.util.function.Supplier; import org.apache.commons.lang.StringUtils; @@ -20,7 +21,9 @@ import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.Query; import org.opensearch.Version; +import org.opensearch.client.Client; import org.opensearch.common.SetOnce; +import org.opensearch.common.collect.Tuple; import org.opensearch.core.ParseField; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.ParsingException; @@ -44,7 +47,8 @@ import lombok.NoArgsConstructor; import lombok.Setter; import lombok.experimental.Accessors; -import lombok.extern.log4j.Log4j2; + +import static org.opensearch.neuralsearch.processor.NeuralSparseTwoPhaseProcessor.splitQueryTokensByRatioedMaxScoreAsThreshold; /** * SparseEncodingQueryBuilder is responsible for handling "neural_sparse" query types. It uses an ML NEURAL_SPARSE model @@ -52,7 +56,6 @@ * to Lucene FeatureQuery wrapped by Lucene BooleanQuery. */ -@Log4j2 @Getter @Setter @Accessors(chain = true, fluent = true) @@ -77,6 +80,17 @@ public class NeuralSparseQueryBuilder extends AbstractQueryBuilder> queryTokensSupplier; + // A field that for neural_sparse_two_phase_processor, if twoPhaseSharedQueryToken is not null, + // it means it's origin NeuralSparseQueryBuilder and should split the low score tokens form itself then put it into + // twoPhaseSharedQueryToken. + private Map twoPhaseSharedQueryToken; + // A parameter with a default value 0F, + // 1. If the query request are using neural_sparse_two_phase_processor and be collected, + // It's value will be the ratio of processor. + // 2. If it's the sub query only build for two-phase, the value will be set to -1 * ratio of processor. + // Then in the DoToQuery, we can use this to determine which type are this queryBuilder. + private float twoPhasePruneRatio = 0F; + private static final Version MINIMAL_SUPPORTED_VERSION_DEFAULT_MODEL_ID = Version.V_2_13_0; public static void initialize(MLCommonsClientAccessor mlClient) { @@ -113,6 +127,33 @@ public NeuralSparseQueryBuilder(StreamInput in) throws IOException { } } + /** + * Copy this QueryBuilder for two phase rescorer, set the copy one's twoPhasePruneRatio to -1. + * @param ratio the parameter of the NeuralSparseTwoPhaseProcessor, control how to split the queryTokens to two phase. + * @return A copy NeuralSparseQueryBuilder for twoPhase, it will be added to the rescorer. + */ + public NeuralSparseQueryBuilder getCopyNeuralSparseQueryBuilderForTwoPhase(float ratio) { + this.twoPhasePruneRatio(ratio); + NeuralSparseQueryBuilder copy = new NeuralSparseQueryBuilder().fieldName(this.fieldName) + .queryName(this.queryName) + .queryText(this.queryText) + .modelId(this.modelId) + .maxTokenScore(this.maxTokenScore) + .twoPhasePruneRatio(-1f * ratio); + if (Objects.nonNull(this.queryTokensSupplier)) { + Map tokens = queryTokensSupplier.get(); + // Splitting tokens based on a threshold value: tokens greater than the threshold are stored in v1, + // while those less than or equal to the threshold are stored in v2. + Tuple, Map> splitTokens = splitQueryTokensByRatioedMaxScoreAsThreshold(tokens, ratio); + this.queryTokensSupplier(() -> splitTokens.v1()); + copy.queryTokensSupplier(() -> splitTokens.v2()); + } else { + this.twoPhaseSharedQueryToken = new HashMap<>(); + copy.queryTokensSupplier(() -> this.twoPhaseSharedQueryToken); + } + return copy; + } + @Override protected void doWriteTo(StreamOutput out) throws IOException { out.writeString(this.fieldName); @@ -143,7 +184,9 @@ protected void doXContent(XContentBuilder xContentBuilder, Params params) throws if (Objects.nonNull(modelId)) { xContentBuilder.field(MODEL_ID_FIELD.getPreferredName(), modelId); } - if (Objects.nonNull(maxTokenScore)) xContentBuilder.field(MAX_TOKEN_SCORE_FIELD.getPreferredName(), maxTokenScore); + if (Objects.nonNull(maxTokenScore)) { + xContentBuilder.field(MAX_TOKEN_SCORE_FIELD.getPreferredName(), maxTokenScore); + } if (Objects.nonNull(queryTokensSupplier) && Objects.nonNull(queryTokensSupplier.get())) { xContentBuilder.field(QUERY_TOKENS_FIELD.getPreferredName(), queryTokensSupplier.get()); } @@ -270,40 +313,61 @@ private static void parseQueryParams(XContentParser parser, NeuralSparseQueryBui } @Override - protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException { + protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) { // We need to inference the sentence to get the queryTokens. The logic is similar to NeuralQueryBuilder // If the inference is finished, then rewrite to self and call doToQuery, otherwise, continue doRewrite - if (null != queryTokensSupplier) { + // QueryTokensSupplier means 2 case now, + // 1. It's the queryBuilder built for two-phase, doesn't need any rewrite. + // 2. It's registerAsyncAction has been registered successful. + if (Objects.nonNull(queryTokensSupplier)) { return this; } - validateForRewrite(queryText, modelId); SetOnce> queryTokensSetOnce = new SetOnce<>(); - queryRewriteContext.registerAsyncAction( - ((client, actionListener) -> ML_CLIENT.inferenceSentencesWithMapResult( - modelId(), - List.of(queryText), - ActionListener.wrap(mapResultList -> { - queryTokensSetOnce.set(TokenWeightUtil.fetchListOfTokenWeightMap(mapResultList).get(0)); - actionListener.onResponse(null); - }, actionListener::onFailure) - )) - ); + queryRewriteContext.registerAsyncAction(getModelInferenceAsync(queryTokensSetOnce)); return new NeuralSparseQueryBuilder().fieldName(fieldName) .queryText(queryText) .modelId(modelId) .maxTokenScore(maxTokenScore) - .queryTokensSupplier(queryTokensSetOnce::get); + .queryTokensSupplier(queryTokensSetOnce::get) + .twoPhaseSharedQueryToken(twoPhaseSharedQueryToken) + .twoPhasePruneRatio(twoPhasePruneRatio); + } + + private BiConsumer> getModelInferenceAsync(SetOnce> setOnce) { + // When Two-phase shared query tokens is null, + // it set queryTokensSupplier to the inference result which has all query tokens with score. + // When Two-phase shared query tokens exist, + // it splits the tokens using a threshold defined by a ratio of the maximum score of tokens, updating the token set + // accordingly. + return ((client, actionListener) -> ML_CLIENT.inferenceSentencesWithMapResult( + modelId(), + List.of(queryText), + ActionListener.wrap(mapResultList -> { + Map queryTokens = TokenWeightUtil.fetchListOfTokenWeightMap(mapResultList).get(0); + if (Objects.nonNull(twoPhaseSharedQueryToken)) { + Tuple, Map> splitQueryTokens = splitQueryTokensByRatioedMaxScoreAsThreshold( + queryTokens, + twoPhasePruneRatio + ); + setOnce.set(splitQueryTokens.v1()); + twoPhaseSharedQueryToken = splitQueryTokens.v2(); + } else { + setOnce.set(queryTokens); + } + actionListener.onResponse(null); + }, actionListener::onFailure) + )); } @Override protected Query doToQuery(QueryShardContext context) throws IOException { final MappedFieldType ft = context.fieldMapper(fieldName); validateFieldType(ft); - Map queryTokens = queryTokensSupplier.get(); - validateQueryTokens(queryTokens); - + if (Objects.isNull(queryTokens)) { + throw new IllegalArgumentException("Query tokens cannot be null."); + } BooleanQuery.Builder builder = new BooleanQuery.Builder(); for (Map.Entry entry : queryTokens.entrySet()) { builder.add(FeatureField.newLinearQuery(fieldName, entry.getKey(), entry.getValue()), BooleanClause.Occur.SHOULD); @@ -325,34 +389,32 @@ private static void validateForRewrite(String queryText, String modelId) { } private static void validateFieldType(MappedFieldType fieldType) { - if (null == fieldType || !fieldType.typeName().equals("rank_features")) { + if (Objects.isNull(fieldType) || !fieldType.typeName().equals("rank_features")) { throw new IllegalArgumentException("[" + NAME + "] query only works on [rank_features] fields"); } } - private static void validateQueryTokens(Map queryTokens) { - if (null == queryTokens) { - throw new IllegalArgumentException("Query tokens cannot be null."); + @Override + protected boolean doEquals(NeuralSparseQueryBuilder obj) { + if (this == obj) { + return true; } - for (Map.Entry entry : queryTokens.entrySet()) { - if (entry.getValue() <= 0) { - throw new IllegalArgumentException( - "Feature weight must be larger than 0, feature [" + entry.getValue() + "] has negative weight." - ); - } + if (Objects.isNull(obj) || getClass() != obj.getClass()) { + return false; + } + if (Objects.isNull(queryTokensSupplier) && Objects.nonNull(obj.queryTokensSupplier)) { + return false; + } + if (Objects.nonNull(queryTokensSupplier) && Objects.isNull(obj.queryTokensSupplier)) { + return false; } - } - @Override - protected boolean doEquals(NeuralSparseQueryBuilder obj) { - if (this == obj) return true; - if (Objects.isNull(obj) || getClass() != obj.getClass()) return false; - if (Objects.isNull(queryTokensSupplier) && Objects.nonNull(obj.queryTokensSupplier)) return false; - if (Objects.nonNull(queryTokensSupplier) && Objects.isNull(obj.queryTokensSupplier)) return false; EqualsBuilder equalsBuilder = new EqualsBuilder().append(fieldName, obj.fieldName) .append(queryText, obj.queryText) .append(modelId, obj.modelId) - .append(maxTokenScore, obj.maxTokenScore); + .append(maxTokenScore, obj.maxTokenScore) + .append(twoPhasePruneRatio, obj.twoPhasePruneRatio) + .append(twoPhaseSharedQueryToken, obj.twoPhaseSharedQueryToken); if (Objects.nonNull(queryTokensSupplier)) { equalsBuilder.append(queryTokensSupplier.get(), obj.queryTokensSupplier.get()); } @@ -361,8 +423,13 @@ protected boolean doEquals(NeuralSparseQueryBuilder obj) { @Override protected int doHashCode() { - HashCodeBuilder builder = new HashCodeBuilder().append(fieldName).append(queryText).append(modelId).append(maxTokenScore); - if (queryTokensSupplier != null) { + HashCodeBuilder builder = new HashCodeBuilder().append(fieldName) + .append(queryText) + .append(modelId) + .append(maxTokenScore) + .append(twoPhasePruneRatio) + .append(twoPhaseSharedQueryToken); + if (Objects.nonNull(queryTokensSupplier)) { builder.append(queryTokensSupplier.get()); } return builder.toHashCode(); @@ -376,4 +443,5 @@ public String getWriteableName() { private static boolean isClusterOnOrAfterMinReqVersionForDefaultModelIdSupport() { return NeuralSearchClusterUtil.instance().getClusterMinVersion().onOrAfter(MINIMAL_SUPPORTED_VERSION_DEFAULT_MODEL_ID); } + } diff --git a/src/test/java/org/opensearch/neuralsearch/plugin/NeuralSearchTests.java b/src/test/java/org/opensearch/neuralsearch/plugin/NeuralSearchTests.java index cb3869868..d0d5b82be 100644 --- a/src/test/java/org/opensearch/neuralsearch/plugin/NeuralSearchTests.java +++ b/src/test/java/org/opensearch/neuralsearch/plugin/NeuralSearchTests.java @@ -17,6 +17,7 @@ import org.opensearch.ingest.IngestService; import org.opensearch.ingest.Processor; import org.opensearch.neuralsearch.processor.NeuralQueryEnricherProcessor; +import org.opensearch.neuralsearch.processor.NeuralSparseTwoPhaseProcessor; import org.opensearch.neuralsearch.processor.NormalizationProcessor; import org.opensearch.neuralsearch.processor.TextEmbeddingProcessor; import org.opensearch.neuralsearch.processor.factory.NormalizationProcessorFactory; @@ -102,5 +103,6 @@ public void testRequestProcessors() { ); assertNotNull(processors); assertNotNull(processors.get(NeuralQueryEnricherProcessor.TYPE)); + assertNotNull(processors.get(NeuralSparseTwoPhaseProcessor.TYPE)); } } diff --git a/src/test/java/org/opensearch/neuralsearch/processor/NeuralSparseTwoPhaseProcessorIT.java b/src/test/java/org/opensearch/neuralsearch/processor/NeuralSparseTwoPhaseProcessorIT.java new file mode 100644 index 000000000..3e4ed8844 --- /dev/null +++ b/src/test/java/org/opensearch/neuralsearch/processor/NeuralSparseTwoPhaseProcessorIT.java @@ -0,0 +1,588 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.processor; + +import lombok.SneakyThrows; +import org.junit.Before; +import org.opensearch.client.ResponseException; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.ConstantScoreQueryBuilder; +import org.opensearch.index.query.DisMaxQueryBuilder; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.functionscore.FunctionScoreQueryBuilder; +import org.opensearch.neuralsearch.BaseNeuralSearchIT; +import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static org.opensearch.neuralsearch.util.TestUtils.createRandomTokenWeightMap; +import static org.opensearch.neuralsearch.util.TestUtils.objectToFloat; + +public class NeuralSparseTwoPhaseProcessorIT extends BaseNeuralSearchIT { + + private static final String index = "two-phase-index"; + private static final String search_pipeline = "two-phase-search-pipeline"; + private final String TYPE = "neural_sparse_two_phase_processor"; + private static final String TEST_TWO_PHASE_BASIC_INDEX_NAME = "test-sparse-basic-index-two-phase"; + private static final String TEST_BASIC_INDEX_NAME = "test-sparse-basic-index"; + private static final String TEST_MULTI_NEURAL_SPARSE_FIELD_INDEX_NAME = "test-sparse-multi-field-index"; + private static final String TEST_TEXT_AND_NEURAL_SPARSE_FIELD_INDEX_NAME = "test-sparse-text-and-field-index"; + private static final String TEST_NESTED_INDEX_NAME = "test-sparse-nested-index"; + private static final String TEST_QUERY_TEXT = "Hello world a b"; + private static final String TEST_NEURAL_SPARSE_FIELD_NAME_1 = "test-sparse-encoding-1"; + private static final String TEST_NEURAL_SPARSE_FIELD_NAME_2 = "test-sparse-encoding-2"; + private static final String TEST_TEXT_FIELD_NAME_1 = "test-text-field"; + private static final String TEST_NEURAL_SPARSE_FIELD_NAME_NESTED = "nested.neural_sparse.field"; + + private static final List TEST_TOKENS = List.of("hello", "world", "a", "b", "c"); + + private static final Float DELTA = 1e-5f; + private final Map testRankFeaturesDoc = createRandomTokenWeightMap(TEST_TOKENS); + private static final List TWO_PHASE_TEST_TOKEN = List.of("hello", "world"); + + private static final Map testFixedQueryTokens = new HashMap<>(); + private static final Supplier> testFixedQueryTokenSupplier = () -> testFixedQueryTokens; + static { + testFixedQueryTokens.put("hello", 5.0f); + testFixedQueryTokens.put("world", 4.0f); + testFixedQueryTokens.put("a", 3.0f); + testFixedQueryTokens.put("b", 2.0f); + testFixedQueryTokens.put("c", 1.0f); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + updateClusterSettings(); + } + + @SneakyThrows + public void testCreateOutOfRangePipeline_thenThrowsException() { + expectThrows(ResponseException.class, () -> createNeuralSparseTwoPhaseSearchProcessor(search_pipeline, 1.1f, 5.0f, 1000)); + expectThrows(ResponseException.class, () -> createNeuralSparseTwoPhaseSearchProcessor(search_pipeline, 0.1f, 0.0f, 1000)); + expectThrows(ResponseException.class, () -> createNeuralSparseTwoPhaseSearchProcessor(search_pipeline, 0.1f, 3.0f, 4)); + } + + @SneakyThrows + public void testBooleanQuery_withMultipleSparseEncodingQueries_whenTwoPhaseEnabled() { + try { + initializeTwoPhaseProcessor(); + initializeIndexIfNotExist(TEST_MULTI_NEURAL_SPARSE_FIELD_INDEX_NAME); + setDefaultSearchPipelineForIndex(TEST_MULTI_NEURAL_SPARSE_FIELD_INDEX_NAME); + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + Map randomTokenWeight = createRandomTokenWeightMap(TWO_PHASE_TEST_TOKEN); + Supplier> randomTokenWeightSupplier = () -> randomTokenWeight; + NeuralSparseQueryBuilder sparseEncodingQueryBuilder1 = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryTokensSupplier(randomTokenWeightSupplier); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder2 = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_2) + .queryText(TEST_QUERY_TEXT) + .queryTokensSupplier(randomTokenWeightSupplier); + boolQueryBuilder.should(sparseEncodingQueryBuilder1).should(sparseEncodingQueryBuilder2); + + Map searchResponseAsMap = search(TEST_MULTI_NEURAL_SPARSE_FIELD_INDEX_NAME, boolQueryBuilder, 1); + Map firstInnerHit = getFirstInnerHit(searchResponseAsMap); + + assertEquals("1", firstInnerHit.get("_id")); + float expectedScore = 2 * computeExpectedScore(testRankFeaturesDoc, randomTokenWeightSupplier.get()); + assertEquals(expectedScore, objectToFloat(firstInnerHit.get("_score")), DELTA); + } finally { + wipeOfTestResources(TEST_MULTI_NEURAL_SPARSE_FIELD_INDEX_NAME, null, null, search_pipeline); + } + } + + @SneakyThrows + private void initializeTwoPhaseProcessor() { + createNeuralSparseTwoPhaseSearchProcessor(search_pipeline); + } + + @SneakyThrows + private void setDefaultSearchPipelineForIndex(String indexName) { + updateIndexSettings(indexName, Settings.builder().put("index.search.default_pipeline", search_pipeline)); + } + + /** + * Tests the neuralSparseQuery when twoPhase enabled with DSL query: + * { + * "query": { + * "bool": { + * "should": [ + * { + * "neural_sparse": { + * "field": "test-sparse-encoding-1", + * "query_text": "TEST_QUERY_TEXT", + * "model_id": "dcsdcasd", + * "boost": 2.0 + * } + * } + * ] + * } + * } + * } + */ + @SneakyThrows + public void testBasicQueryUsingQueryText_whenTwoPhaseEnabled_thenGetExpectedScore() { + try { + initializeIndexIfNotExist(TEST_BASIC_INDEX_NAME); + initializeTwoPhaseProcessor(); + setDefaultSearchPipelineForIndex(TEST_BASIC_INDEX_NAME); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryText(TEST_QUERY_TEXT) + .queryTokensSupplier(testFixedQueryTokenSupplier) + .boost(2.0f); + Map searchResponseAsMap = search(TEST_BASIC_INDEX_NAME, sparseEncodingQueryBuilder, 1); + Map firstInnerHit = getFirstInnerHit(searchResponseAsMap); + + assertEquals("1", firstInnerHit.get("_id")); + float expectedScore = 2 * computeExpectedScore(testRankFeaturesDoc, testFixedQueryTokens); + assertEquals(expectedScore, objectToFloat(firstInnerHit.get("_score")), DELTA); + } finally { + wipeOfTestResources(TEST_BASIC_INDEX_NAME, null, null, search_pipeline); + } + } + + @SneakyThrows + public void testBasicQueryUsingQueryText_whenTwoPhaseEnabledAndDisabled_thenGetSameScore() { + try { + initializeIndexIfNotExist(TEST_BASIC_INDEX_NAME); + initializeTwoPhaseProcessor(); + + setDefaultSearchPipelineForIndex(TEST_BASIC_INDEX_NAME); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryText(TEST_QUERY_TEXT) + .queryTokensSupplier(testFixedQueryTokenSupplier) + .boost(2.0f); + Map searchResponseAsMap = search(TEST_BASIC_INDEX_NAME, sparseEncodingQueryBuilder, 1); + Map firstInnerHit = getFirstInnerHit(searchResponseAsMap); + assertEquals("1", firstInnerHit.get("_id")); + float scoreWithoutTwoPhase = objectToFloat(firstInnerHit.get("_score")); + + sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryText(TEST_QUERY_TEXT) + .queryTokensSupplier(testFixedQueryTokenSupplier) + .boost(2.0f); + searchResponseAsMap = search(TEST_BASIC_INDEX_NAME, sparseEncodingQueryBuilder, 1); + firstInnerHit = getFirstInnerHit(searchResponseAsMap); + assertEquals("1", firstInnerHit.get("_id")); + float scoreWithTwoPhase = objectToFloat(firstInnerHit.get("_score")); + assertEquals(scoreWithTwoPhase, scoreWithoutTwoPhase, DELTA); + } finally { + wipeOfTestResources(TEST_BASIC_INDEX_NAME, null, null, search_pipeline); + } + } + + /** + * Tests neuralSparseQuery as rescoreQuery with DSL query: + * { + * "query": { + * "match_all": {} + * }, + * "rescore": { + * "query": { + * "bool": { + * "should": [ + * { + * "neural_sparse": { + * "field": "test-sparse-encoding-1", + * "query_text": "Hello world a b", + * "model_id": "dcsdcasd", + * "boost": 2.0 + * } + * } + * ] + * } + * } + * } + * } + */ + @SneakyThrows + public void testNeuralSparseQueryAsRescoreQuery_whenTwoPhase_thenGetExpectedScore() { + try { + initializeIndexIfNotExist(TEST_BASIC_INDEX_NAME); + initializeTwoPhaseProcessor(); + + setDefaultSearchPipelineForIndex(TEST_BASIC_INDEX_NAME); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryText(TEST_QUERY_TEXT) + .queryTokensSupplier(testFixedQueryTokenSupplier) + .boost(2.0f); + QueryBuilder queryBuilder = new MatchAllQueryBuilder(); + Map searchResponseAsMap = search(TEST_BASIC_INDEX_NAME, queryBuilder, sparseEncodingQueryBuilder, 1); + Map firstInnerHit = getFirstInnerHit(searchResponseAsMap); + assertEquals("1", firstInnerHit.get("_id")); + float expectedScore = 2 * computeExpectedScore(testRankFeaturesDoc, testFixedQueryTokens); + assertEquals(expectedScore, objectToFloat(firstInnerHit.get("_score")), DELTA); + } finally { + wipeOfTestResources(TEST_BASIC_INDEX_NAME, null, null, search_pipeline); + } + } + + /** + * Tests multi neuralSparseQuery in BooleanQuery with DSL query: + * { + * "query": { + * "bool": { + * "should": [ + * { + * "neural_sparse": { + * "field": "test-sparse-encoding-1", + * "query_text": "Hello world a b", + * "model_id": "dcsdcasd", + * "boost": 2.0 + * } + * }, + * { + * "neural_sparse": { + * "field": "test-sparse-encoding-1", + * "query_text": "Hello world a b", + * "model_id": "dcsdcasd", + * "boost": 2.0 + * } + * } + * ] + * } + * } + * } + */ + @SneakyThrows + public void testMultiNeuralSparseQuery_whenTwoPhase_thenGetExpectedScore() { + try { + initializeIndexIfNotExist(TEST_BASIC_INDEX_NAME); + initializeTwoPhaseProcessor(); + setDefaultSearchPipelineForIndex(TEST_BASIC_INDEX_NAME); + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryTokensSupplier(testFixedQueryTokenSupplier) + .boost(2.0f); + boolQueryBuilder.should(sparseEncodingQueryBuilder); + boolQueryBuilder.should(sparseEncodingQueryBuilder); + Map searchResponseAsMap = search(TEST_BASIC_INDEX_NAME, boolQueryBuilder, 1); + Map firstInnerHit = getFirstInnerHit(searchResponseAsMap); + assertEquals("1", firstInnerHit.get("_id")); + float expectedScore = 4 * computeExpectedScore(testRankFeaturesDoc, testFixedQueryTokens); + assertEquals(expectedScore, objectToFloat(firstInnerHit.get("_score")), DELTA); + } finally { + wipeOfTestResources(TEST_BASIC_INDEX_NAME, null, null, search_pipeline); + } + } + + /** + * This test case aim to test different score caused by different two-phase parameters. + * First, with a default parameter, two-phase get same score at most times. + * Second, With a high ratio, there may some docs including lots of low score tokens are missed. + * And then, lower ratio or higher windows size can improve accuracy. + */ + @SneakyThrows + public void testNeuralSparseQuery_whenDifferentTwoPhaseParameter_thenGetDifferentResult() { + try { + initializeIndexIfNotExist(TEST_TWO_PHASE_BASIC_INDEX_NAME); + Map queryToken = new HashMap<>(); + for (int i = 1; i < 6; i++) { + queryToken.put(String.valueOf(i + 10), (float) i); + } + for (int i = 1; i < 8; i++) { + queryToken.put(String.valueOf(i), (float) i); + } + Supplier> queryTokenSupplier = () -> queryToken; + + NeuralSparseQueryBuilder sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryTokensSupplier(queryTokenSupplier); + assertSearchScore(sparseEncodingQueryBuilder, TEST_TWO_PHASE_BASIC_INDEX_NAME, 110); + createNeuralSparseTwoPhaseSearchProcessor(search_pipeline, 0.3f, 1f, 10000); + setDefaultSearchPipelineForIndex(TEST_TWO_PHASE_BASIC_INDEX_NAME); + assertSearchScore(sparseEncodingQueryBuilder, TEST_TWO_PHASE_BASIC_INDEX_NAME, 110); + createNeuralSparseTwoPhaseSearchProcessor(search_pipeline, 0.7f, 1f, 10000); + setDefaultSearchPipelineForIndex(TEST_TWO_PHASE_BASIC_INDEX_NAME); + assertSearchScore(sparseEncodingQueryBuilder, TEST_TWO_PHASE_BASIC_INDEX_NAME, 61); + createNeuralSparseTwoPhaseSearchProcessor(search_pipeline, 0.7f, 30f, 10000); + setDefaultSearchPipelineForIndex(TEST_TWO_PHASE_BASIC_INDEX_NAME); + assertSearchScore(sparseEncodingQueryBuilder, TEST_TWO_PHASE_BASIC_INDEX_NAME, 110); + } finally { + wipeOfTestResources(TEST_TWO_PHASE_BASIC_INDEX_NAME, null, null, search_pipeline); + } + } + + @SneakyThrows + public void testMultiNeuralSparseQuery_whenTwoPhaseAndFilter_thenGetExpectedScore() { + try { + initializeIndexIfNotExist(TEST_BASIC_INDEX_NAME); + createNeuralSparseTwoPhaseSearchProcessor(search_pipeline, .8f, 5f, 1000); + setDefaultSearchPipelineForIndex(TEST_BASIC_INDEX_NAME); + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryText(TEST_QUERY_TEXT) + .queryTokensSupplier(testFixedQueryTokenSupplier) + .boost(2.0f); + boolQueryBuilder.should(sparseEncodingQueryBuilder); + boolQueryBuilder.filter(sparseEncodingQueryBuilder); + Map searchResponseAsMap = search(TEST_BASIC_INDEX_NAME, boolQueryBuilder, 1); + Map firstInnerHit = getFirstInnerHit(searchResponseAsMap); + assertEquals("1", firstInnerHit.get("_id")); + float expectedScore = 2 * computeExpectedScore(testRankFeaturesDoc, testFixedQueryTokens); + assertEquals(expectedScore, objectToFloat(firstInnerHit.get("_score")), DELTA); + } finally { + wipeOfTestResources(TEST_BASIC_INDEX_NAME, null, null, search_pipeline); + } + } + + @SneakyThrows + public void testMultiNeuralSparseQuery_whenTwoPhaseAndMultiBoolean_thenGetExpectedScore() { + String modelId = null; + try { + initializeIndexIfNotExist(TEST_BASIC_INDEX_NAME); + + createNeuralSparseTwoPhaseSearchProcessor(search_pipeline, .6f, 5f, 1000); + setDefaultSearchPipelineForIndex(TEST_BASIC_INDEX_NAME); + modelId = prepareSparseEncodingModel(); + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder1 = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryText(TEST_QUERY_TEXT) + .modelId(modelId) + .boost(1.0f); + boolQueryBuilder.should(sparseEncodingQueryBuilder1); + boolQueryBuilder.should(sparseEncodingQueryBuilder1); + BoolQueryBuilder subBoolQueryBuilder = new BoolQueryBuilder(); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder2 = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryText(TEST_QUERY_TEXT) + .modelId(modelId) + .boost(2.0f); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder3 = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryText(TEST_QUERY_TEXT) + .modelId(modelId) + .boost(3.0f); + subBoolQueryBuilder.should(sparseEncodingQueryBuilder2); + subBoolQueryBuilder.should(sparseEncodingQueryBuilder3); + subBoolQueryBuilder.boost(2.0f); + boolQueryBuilder.should(subBoolQueryBuilder); + Map searchResponseAsMap = search(TEST_BASIC_INDEX_NAME, boolQueryBuilder, 1); + Map firstInnerHit = getFirstInnerHit(searchResponseAsMap); + assertEquals("1", firstInnerHit.get("_id")); + float expectedScore = 12 * computeExpectedScore(modelId, testRankFeaturesDoc, TEST_QUERY_TEXT); + assertEquals(expectedScore, objectToFloat(firstInnerHit.get("_score")), DELTA); + } finally { + wipeOfTestResources(TEST_BASIC_INDEX_NAME, null, modelId, null); + } + } + + @SneakyThrows + public void testMultiNeuralSparseQuery_whenTwoPhaseAndNoLowScoreToken_thenGetExpectedScore() { + try { + initializeIndexIfNotExist(TEST_BASIC_INDEX_NAME); + createNeuralSparseTwoPhaseSearchProcessor(search_pipeline, .6f, 5f, 1000); + setDefaultSearchPipelineForIndex(TEST_BASIC_INDEX_NAME); + Map queryTokens = new HashMap<>(); + queryTokens.put("hello", 10.0f); + queryTokens.put("world", 10.0f); + queryTokens.put("a", 10.0f); + queryTokens.put("b", 10.0f); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryTokensSupplier(() -> queryTokens) + .boost(2.0f); + Map searchResponseAsMap = search(TEST_BASIC_INDEX_NAME, sparseEncodingQueryBuilder, 1); + Map firstInnerHit = getFirstInnerHit(searchResponseAsMap); + assertEquals("1", firstInnerHit.get("_id")); + float expectedScore = 2 * computeExpectedScore(testRankFeaturesDoc, sparseEncodingQueryBuilder.queryTokensSupplier().get()); + assertEquals(expectedScore, objectToFloat(firstInnerHit.get("_score")), DELTA); + } finally { + wipeOfTestResources(TEST_BASIC_INDEX_NAME, null, null, null); + } + } + + @SneakyThrows + public void testNeuralSParseQuery_whenTwoPhaseAndNestedInConstantScoreQuery_thenSuccess() { + try { + initializeIndexIfNotExist(TEST_BASIC_INDEX_NAME); + createNeuralSparseTwoPhaseSearchProcessor(search_pipeline, 0.6f, 5f, 10000); + setDefaultSearchPipelineForIndex(TEST_BASIC_INDEX_NAME); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryText(TEST_QUERY_TEXT) + .queryTokensSupplier(testFixedQueryTokenSupplier) + .boost(1.0f); + ConstantScoreQueryBuilder constantScoreQueryBuilder = new ConstantScoreQueryBuilder(sparseEncodingQueryBuilder); + Map searchResponseAsMap = search(TEST_BASIC_INDEX_NAME, constantScoreQueryBuilder, 1); + Map firstInnerHit = getFirstInnerHit(searchResponseAsMap); + assertEquals("1", firstInnerHit.get("_id")); + assertEquals(1.0f, objectToFloat(firstInnerHit.get("_score")), DELTA); + } finally { + wipeOfTestResources(TEST_BASIC_INDEX_NAME, null, null, search_pipeline); + } + } + + @SneakyThrows + public void testNeuralSParseQuery_whenTwoPhaseAndNestedInDisjunctionMaxQuery_thenSuccess() { + try { + initializeIndexIfNotExist(TEST_BASIC_INDEX_NAME); + createNeuralSparseTwoPhaseSearchProcessor(search_pipeline, 0.6f, 5f, 10000); + setDefaultSearchPipelineForIndex(TEST_BASIC_INDEX_NAME); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryText(TEST_QUERY_TEXT) + .queryTokensSupplier(testFixedQueryTokenSupplier) + .boost(5.0f); + DisMaxQueryBuilder disMaxQueryBuilder = new DisMaxQueryBuilder(); + disMaxQueryBuilder.add(sparseEncodingQueryBuilder); + disMaxQueryBuilder.add(new MatchAllQueryBuilder()); + Map searchResponseAsMap = search(TEST_BASIC_INDEX_NAME, disMaxQueryBuilder, 1); + Map firstInnerHit = getFirstInnerHit(searchResponseAsMap); + assertEquals("1", firstInnerHit.get("_id")); + float expectedScore = 5f * computeExpectedScore(testRankFeaturesDoc, testFixedQueryTokens); + assertEquals(expectedScore, objectToFloat(firstInnerHit.get("_score")), DELTA); + } finally { + wipeOfTestResources(TEST_BASIC_INDEX_NAME, null, null, search_pipeline); + } + } + + @SneakyThrows + public void testNeuralSparseQuery_whenTwoPhaseAndNestedInFunctionScoreQuery_thenSuccess() { + try { + initializeIndexIfNotExist(TEST_BASIC_INDEX_NAME); + createNeuralSparseTwoPhaseSearchProcessor(search_pipeline, 0.6f, 5f, 10000); + setDefaultSearchPipelineForIndex(TEST_BASIC_INDEX_NAME); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryText(TEST_QUERY_TEXT) + .queryTokensSupplier(testFixedQueryTokenSupplier) + .boost(5.0f); + FunctionScoreQueryBuilder functionScoreQueryBuilder = new FunctionScoreQueryBuilder(sparseEncodingQueryBuilder); + functionScoreQueryBuilder.boost(2.0f); + Map searchResponseAsMap = search(TEST_BASIC_INDEX_NAME, functionScoreQueryBuilder, 1); + Map firstInnerHit = getFirstInnerHit(searchResponseAsMap); + assertEquals("1", firstInnerHit.get("_id")); + float expectedScore = 10f * computeExpectedScore(testRankFeaturesDoc, testFixedQueryTokens); + assertEquals(expectedScore, objectToFloat(firstInnerHit.get("_score")), DELTA); + } finally { + wipeOfTestResources(TEST_BASIC_INDEX_NAME, null, null, search_pipeline); + } + } + + /** + * Tests multi neuralSparseQuery in BooleanQuery with DSL query: + * { + * "query": { + * "bool": { + * "should": [ + * { + * "neural_sparse": { + * "field": "test-sparse-encoding-1", + * "query_text": "Hello world a b", + * "model_id": "dcsdcasd", + * "boost": 2.0 + * } + * }, + * { + * "neural_sparse": { + * "field": "test-sparse-encoding-1", + * "query_text": "Hello world a b", + * "model_id": "dcsdcasd", + * "boost": 2.0 + * } + * } + * ] + * } + * } + * } + */ + @SneakyThrows + public void testMultiNeuralSparseQuery_whenTwoPhaseAndModelInference_thenGetExpectedScore() { + String modelId = null; + try { + initializeIndexIfNotExist(TEST_BASIC_INDEX_NAME); + initializeTwoPhaseProcessor(); + setDefaultSearchPipelineForIndex(TEST_BASIC_INDEX_NAME); + modelId = prepareSparseEncodingModel(); + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_NEURAL_SPARSE_FIELD_NAME_1) + .queryText(TEST_QUERY_TEXT) + .modelId(modelId) + .boost(3.0f); + boolQueryBuilder.should(sparseEncodingQueryBuilder); + boolQueryBuilder.should(sparseEncodingQueryBuilder); + Map searchResponseAsMap = search(TEST_BASIC_INDEX_NAME, boolQueryBuilder, 1); + Map firstInnerHit = getFirstInnerHit(searchResponseAsMap); + assertEquals("1", firstInnerHit.get("_id")); + float expectedScore = 6 * computeExpectedScore(modelId, testRankFeaturesDoc, TEST_QUERY_TEXT); + assertEquals(expectedScore, objectToFloat(firstInnerHit.get("_score")), DELTA); + } finally { + wipeOfTestResources(TEST_BASIC_INDEX_NAME, null, modelId, search_pipeline); + } + } + + @SneakyThrows + protected void initializeIndexIfNotExist(String indexName) { + if (TEST_BASIC_INDEX_NAME.equals(indexName) && !indexExists(indexName)) { + prepareSparseEncodingIndex(indexName, List.of(TEST_NEURAL_SPARSE_FIELD_NAME_1)); + addSparseEncodingDoc(indexName, "1", List.of(TEST_NEURAL_SPARSE_FIELD_NAME_1), List.of(testRankFeaturesDoc)); + assertEquals(1, getDocCount(indexName)); + } + + if (TEST_MULTI_NEURAL_SPARSE_FIELD_INDEX_NAME.equals(indexName) && !indexExists(indexName)) { + prepareSparseEncodingIndex(indexName, List.of(TEST_NEURAL_SPARSE_FIELD_NAME_1, TEST_NEURAL_SPARSE_FIELD_NAME_2)); + addSparseEncodingDoc( + indexName, + "1", + List.of(TEST_NEURAL_SPARSE_FIELD_NAME_1, TEST_NEURAL_SPARSE_FIELD_NAME_2), + List.of(testRankFeaturesDoc, testRankFeaturesDoc) + ); + assertEquals(1, getDocCount(indexName)); + } + + if (TEST_TEXT_AND_NEURAL_SPARSE_FIELD_INDEX_NAME.equals(indexName) && !indexExists(indexName)) { + prepareSparseEncodingIndex(indexName, List.of(TEST_NEURAL_SPARSE_FIELD_NAME_1)); + addSparseEncodingDoc( + indexName, + "1", + List.of(TEST_NEURAL_SPARSE_FIELD_NAME_1), + List.of(testRankFeaturesDoc), + List.of(TEST_TEXT_FIELD_NAME_1), + List.of(TEST_QUERY_TEXT) + ); + assertEquals(1, getDocCount(indexName)); + } + + if (TEST_NESTED_INDEX_NAME.equals(indexName) && !indexExists(indexName)) { + prepareSparseEncodingIndex(indexName, List.of(TEST_NEURAL_SPARSE_FIELD_NAME_NESTED)); + addSparseEncodingDoc(indexName, "1", List.of(TEST_NEURAL_SPARSE_FIELD_NAME_NESTED), List.of(testRankFeaturesDoc)); + assertEquals(1, getDocCount(TEST_NESTED_INDEX_NAME)); + } + + if (TEST_TWO_PHASE_BASIC_INDEX_NAME.equals(indexName) && !indexExists(indexName)) { + Map twoPhaseRandFeatures = new HashMap<>(); + Map normalRandFeatures = new HashMap<>(); + prepareSparseEncodingIndex(indexName, List.of(TEST_NEURAL_SPARSE_FIELD_NAME_1)); + // put [(5,5.0), (6,6.0)] into twoPhaseRandFeatures + for (int i = 5; i < 7; i++) { + twoPhaseRandFeatures.put(String.valueOf(i), (float) i); + } + + // put 10 token [(1,1.0),(11,1.0),....(5,5.0),(55,5.0)] into normalRandFeatures + for (int i = 1; i < 6; i++) { + normalRandFeatures.put(String.valueOf(i), (float) i); + normalRandFeatures.put(String.valueOf(10 + i), (float) i); + + } + + for (int i = 0; i < 10; i++) { + addSparseEncodingDoc(indexName, String.valueOf(i), List.of(TEST_NEURAL_SPARSE_FIELD_NAME_1), List.of(normalRandFeatures)); + addSparseEncodingDoc( + indexName, + String.valueOf(i + 10), + List.of(TEST_NEURAL_SPARSE_FIELD_NAME_1), + List.of(twoPhaseRandFeatures) + ); + ; + } + assertEquals(20, getDocCount(indexName)); + } + } + + private void assertSearchScore(NeuralSparseQueryBuilder builder, String indexName, float expectedScore) { + Map searchResponse = search(indexName, builder, 10); + Map firstInnerHit = getFirstInnerHit(searchResponse); + assertEquals(expectedScore, objectToFloat(firstInnerHit.get("_score")), DELTA); + } + +} diff --git a/src/test/java/org/opensearch/neuralsearch/processor/NeuralSparseTwoPhaseProcessorTests.java b/src/test/java/org/opensearch/neuralsearch/processor/NeuralSparseTwoPhaseProcessorTests.java new file mode 100644 index 000000000..2ce7c7b96 --- /dev/null +++ b/src/test/java/org/opensearch/neuralsearch/processor/NeuralSparseTwoPhaseProcessorTests.java @@ -0,0 +1,202 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.processor; + +import lombok.SneakyThrows; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.common.collect.Tuple; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.rescore.QueryRescorerBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class NeuralSparseTwoPhaseProcessorTests extends OpenSearchTestCase { + static final private String PARAMETER_KEY = "two_phase_parameter"; + static final private String RATIO_KEY = "prune_ratio"; + static final private String ENABLE_KEY = "enabled"; + static final private String EXPANSION_KEY = "expansion_rate"; + static final private String MAX_WINDOW_SIZE_KEY = "max_window_size"; + + public void testFactory_whenCreateDefaultPipeline_thenSuccess() throws Exception { + NeuralSparseTwoPhaseProcessor.Factory factory = new NeuralSparseTwoPhaseProcessor.Factory(); + NeuralSparseTwoPhaseProcessor processor = createTestProcessor(factory); + assertEquals(0.3f, processor.getRatio(), 1e-3); + assertEquals(4.0f, processor.getWindowExpansion(), 1e-3); + assertEquals(10000, processor.getMaxWindowSize()); + + NeuralSparseTwoPhaseProcessor defaultProcessor = factory.create( + Collections.emptyMap(), + null, + null, + false, + Collections.emptyMap(), + null + ); + assertEquals(0.4f, defaultProcessor.getRatio(), 1e-3); + assertEquals(5.0f, defaultProcessor.getWindowExpansion(), 1e-3); + assertEquals(10000, defaultProcessor.getMaxWindowSize()); + } + + public void testFactory_whenRatioOutOfRange_thenThrowException() { + NeuralSparseTwoPhaseProcessor.Factory factory = new NeuralSparseTwoPhaseProcessor.Factory(); + expectThrows(IllegalArgumentException.class, () -> createTestProcessor(factory, 1.1f, true, 5.0f, 10000)); + } + + public void testFactory_whenWindowExpansionOutOfRange_thenThrowException() { + NeuralSparseTwoPhaseProcessor.Factory factory = new NeuralSparseTwoPhaseProcessor.Factory(); + expectThrows(IllegalArgumentException.class, () -> createTestProcessor(factory, 0.1f, true, 0.5f, 10000)); + expectThrows(IllegalArgumentException.class, () -> createTestProcessor(factory, 0.1f, true, -0.5f, 10000)); + } + + public void testFactory_whenMaxWindowSizeOutOfRange_thenThrowException() { + NeuralSparseTwoPhaseProcessor.Factory factory = new NeuralSparseTwoPhaseProcessor.Factory(); + expectThrows(IllegalArgumentException.class, () -> createTestProcessor(factory, 0.1f, true, 5.5f, -1)); + } + + public void testProcessRequest_whenTwoPhaseEnabled_thenSuccess() throws Exception { + NeuralSparseTwoPhaseProcessor.Factory factory = new NeuralSparseTwoPhaseProcessor.Factory(); + NeuralSparseQueryBuilder neuralQueryBuilder = new NeuralSparseQueryBuilder(); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(new SearchSourceBuilder().query(neuralQueryBuilder)); + NeuralSparseTwoPhaseProcessor processor = createTestProcessor(factory, 0.5f, true, 4.0f, 10000); + processor.processRequest(searchRequest); + NeuralSparseQueryBuilder queryBuilder = (NeuralSparseQueryBuilder) searchRequest.source().query(); + assertEquals(queryBuilder.twoPhasePruneRatio(), 0.5f, 1e-3); + assertNotNull(searchRequest.source().rescores()); + } + + public void testProcessRequest_whenTwoPhaseEnabledAndNestedBoolean_thenSuccess() throws Exception { + NeuralSparseTwoPhaseProcessor.Factory factory = new NeuralSparseTwoPhaseProcessor.Factory(); + NeuralSparseQueryBuilder neuralQueryBuilder = new NeuralSparseQueryBuilder(); + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + boolQueryBuilder.should(neuralQueryBuilder); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(new SearchSourceBuilder().query(boolQueryBuilder)); + NeuralSparseTwoPhaseProcessor processor = createTestProcessor(factory, 0.5f, true, 4.0f, 10000); + processor.processRequest(searchRequest); + BoolQueryBuilder queryBuilder = (BoolQueryBuilder) searchRequest.source().query(); + NeuralSparseQueryBuilder neuralSparseQueryBuilder = (NeuralSparseQueryBuilder) queryBuilder.should().get(0); + assertEquals(neuralSparseQueryBuilder.twoPhasePruneRatio(), 0.5f, 1e-3); + assertNotNull(searchRequest.source().rescores()); + } + + public void testProcessRequestWithRescorer_whenTwoPhaseEnabled_thenSuccess() throws Exception { + NeuralSparseTwoPhaseProcessor.Factory factory = new NeuralSparseTwoPhaseProcessor.Factory(); + NeuralSparseQueryBuilder neuralQueryBuilder = new NeuralSparseQueryBuilder(); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(new SearchSourceBuilder().query(neuralQueryBuilder)); + QueryRescorerBuilder queryRescorerBuilder = new QueryRescorerBuilder(new MatchAllQueryBuilder()); + queryRescorerBuilder.setRescoreQueryWeight(0f); + searchRequest.source().addRescorer(queryRescorerBuilder); + NeuralSparseTwoPhaseProcessor processor = createTestProcessor(factory, 0.5f, true, 4.0f, 10000); + processor.processRequest(searchRequest); + NeuralSparseQueryBuilder queryBuilder = (NeuralSparseQueryBuilder) searchRequest.source().query(); + assertEquals(queryBuilder.twoPhasePruneRatio(), 0.5f, 1e-3); + assertNotNull(searchRequest.source().rescores()); + } + + public void testProcessRequest_whenTwoPhaseDisabled_thenSuccess() throws Exception { + NeuralSparseTwoPhaseProcessor.Factory factory = new NeuralSparseTwoPhaseProcessor.Factory(); + NeuralSparseQueryBuilder neuralQueryBuilder = new NeuralSparseQueryBuilder(); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(new SearchSourceBuilder().query(neuralQueryBuilder)); + NeuralSparseTwoPhaseProcessor processor = createTestProcessor(factory, 0.5f, false, 4.0f, 10000); + processor.processRequest(searchRequest); + NeuralSparseQueryBuilder queryBuilder = (NeuralSparseQueryBuilder) searchRequest.source().query(); + assertEquals(queryBuilder.twoPhasePruneRatio(), 0f, 1e-3); + assertNull(searchRequest.source().rescores()); + } + + @SneakyThrows + public void testProcessRequest_whenTwoPhaseEnabledAndOutOfWindowSize_thenThrowException() { + NeuralSparseTwoPhaseProcessor.Factory factory = new NeuralSparseTwoPhaseProcessor.Factory(); + NeuralSparseQueryBuilder neuralQueryBuilder = new NeuralSparseQueryBuilder(); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(new SearchSourceBuilder().query(neuralQueryBuilder)); + QueryRescorerBuilder queryRescorerBuilder = new QueryRescorerBuilder(new MatchAllQueryBuilder()); + queryRescorerBuilder.setRescoreQueryWeight(0f); + searchRequest.source().addRescorer(queryRescorerBuilder); + NeuralSparseTwoPhaseProcessor processor = createTestProcessor(factory, 0.5f, true, 400.0f, 100); + expectThrows(IllegalArgumentException.class, () -> processor.processRequest(searchRequest)); + } + + @SneakyThrows + public void testProcessRequest_whenTwoPhaseEnabledAndWithOutNeuralSparseQuery_thenReturnRequest() { + NeuralSparseTwoPhaseProcessor.Factory factory = new NeuralSparseTwoPhaseProcessor.Factory(); + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + boolQueryBuilder.should(new MatchAllQueryBuilder()); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(new SearchSourceBuilder().query(boolQueryBuilder)); + NeuralSparseTwoPhaseProcessor processor = createTestProcessor(factory, 0.5f, true, 400.0f, 100); + SearchRequest returnRequest = processor.processRequest(searchRequest); + assertNull(returnRequest.source().rescores()); + } + + @SneakyThrows + public void testGetSplitSetOnceByScoreThreshold() { + Map queryTokens = new HashMap<>(); + for (int i = 0; i < 10; i++) { + queryTokens.put(String.valueOf(i), (float) i); + } + Tuple, Map> splitQueryTokens = NeuralSparseTwoPhaseProcessor + .splitQueryTokensByRatioedMaxScoreAsThreshold(queryTokens, 0.4f); + assertNotNull(splitQueryTokens); + Map upSet = splitQueryTokens.v1(); + Map downSet = splitQueryTokens.v2(); + assertNotNull(upSet); + assertEquals(6, upSet.size()); + assertNotNull(downSet); + assertEquals(4, downSet.size()); + } + + @SneakyThrows + public void testGetSplitSetOnceByScoreThreshold_whenNullQueryToken_thenThrowException() { + Map queryTokens = null; + expectThrows( + IllegalArgumentException.class, + () -> NeuralSparseTwoPhaseProcessor.splitQueryTokensByRatioedMaxScoreAsThreshold(queryTokens, 0.4f) + ); + } + + public void testType() throws Exception { + NeuralSparseTwoPhaseProcessor.Factory factory = new NeuralSparseTwoPhaseProcessor.Factory(); + NeuralSparseTwoPhaseProcessor processor = createTestProcessor(factory); + assertEquals(NeuralSparseTwoPhaseProcessor.TYPE, processor.getType()); + } + + private NeuralSparseTwoPhaseProcessor createTestProcessor( + NeuralSparseTwoPhaseProcessor.Factory factory, + float ratio, + boolean enabled, + float expand, + int max_window + ) throws Exception { + Map configMap = new HashMap<>(); + configMap.put(ENABLE_KEY, enabled); + Map twoPhaseParaMap = new HashMap<>(); + twoPhaseParaMap.put(RATIO_KEY, ratio); + twoPhaseParaMap.put(EXPANSION_KEY, expand); + twoPhaseParaMap.put(MAX_WINDOW_SIZE_KEY, max_window); + configMap.put(PARAMETER_KEY, twoPhaseParaMap); + return factory.create(Collections.emptyMap(), null, null, false, configMap, null); + } + + private NeuralSparseTwoPhaseProcessor createTestProcessor(NeuralSparseTwoPhaseProcessor.Factory factory) throws Exception { + Map configMap = new HashMap<>(); + configMap.put(ENABLE_KEY, true); + Map twoPhaseParaMap = new HashMap<>(); + twoPhaseParaMap.put(RATIO_KEY, 0.3f); + twoPhaseParaMap.put(EXPANSION_KEY, 4.0f); + twoPhaseParaMap.put(MAX_WINDOW_SIZE_KEY, 10000); + configMap.put(PARAMETER_KEY, twoPhaseParaMap); + return factory.create(Collections.emptyMap(), null, null, false, configMap, null); + } +} diff --git a/src/test/java/org/opensearch/neuralsearch/query/NeuralSparseQueryBuilderTests.java b/src/test/java/org/opensearch/neuralsearch/query/NeuralSparseQueryBuilderTests.java index 4d2fe540d..7509efd42 100644 --- a/src/test/java/org/opensearch/neuralsearch/query/NeuralSparseQueryBuilderTests.java +++ b/src/test/java/org/opensearch/neuralsearch/query/NeuralSparseQueryBuilderTests.java @@ -17,6 +17,7 @@ import static org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder.QUERY_TOKENS_FIELD; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -685,4 +686,19 @@ public void testDoToQuery_successfulDoToQuery() { assertEquals(sparseEncodingQueryBuilder.doToQuery(mockedQueryShardContext), targetQueryBuilder.build()); } + + @SneakyThrows + public void testDoToQuery_whenEmptyQueryToken_thenThrowException() { + NeuralSparseQueryBuilder sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(FIELD_NAME) + .maxTokenScore(MAX_TOKEN_SCORE) + .queryText(QUERY_TEXT) + .modelId(MODEL_ID) + .queryTokensSupplier(() -> Collections.emptyMap()); + QueryShardContext mockedQueryShardContext = mock(QueryShardContext.class); + MappedFieldType mockedMappedFieldType = mock(MappedFieldType.class); + doAnswer(invocation -> "rank_features").when(mockedMappedFieldType).typeName(); + doAnswer(invocation -> mockedMappedFieldType).when(mockedQueryShardContext).fieldMapper(any()); + expectThrows(IllegalArgumentException.class, () -> sparseEncodingQueryBuilder.doToQuery(mock(QueryShardContext.class))); + } + } diff --git a/src/test/resources/processor/NeuralSparseTwoPhaseProcessorConfiguration.json b/src/test/resources/processor/NeuralSparseTwoPhaseProcessorConfiguration.json new file mode 100644 index 000000000..45e435268 --- /dev/null +++ b/src/test/resources/processor/NeuralSparseTwoPhaseProcessorConfiguration.json @@ -0,0 +1,16 @@ +{ + "request_processors": [ + { + "neural_sparse_two_phase_processor": { + "tag": "neural-sparse", + "description": "This processor is making two-phase rescorer.", + "enabled": true, + "two_phase_parameter": { + "prune_ratio": %f, + "expansion_rate": %f, + "max_window_size": %d + } + } + } + ] +} diff --git a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java index 516e9fe8c..2682ee7c7 100644 --- a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java +++ b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java @@ -310,6 +310,36 @@ protected void createPipelineProcessor(final String requestBody, final String pi assertEquals("true", node.get("acknowledged").toString()); } + protected void createNeuralSparseTwoPhaseSearchProcessor(final String pipelineName) throws Exception { + createNeuralSparseTwoPhaseSearchProcessor(pipelineName, 0.4f, 5.0f, 10000); + } + + protected void createNeuralSparseTwoPhaseSearchProcessor( + final String pipelineName, + float pruneRatio, + float expansionRate, + int maxWindowSize + ) throws Exception { + String jsonTemplate = Files.readString( + Path.of(Objects.requireNonNull(classLoader.getResource("processor/NeuralSparseTwoPhaseProcessorConfiguration.json")).toURI()) + ); + String customizedJson = String.format(Locale.ROOT, jsonTemplate, pruneRatio, expansionRate, maxWindowSize); + Response pipelineCreateResponse = makeRequest( + client(), + "PUT", + "/_search/pipeline/" + pipelineName, + null, + toHttpEntity(customizedJson), + ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, DEFAULT_USER_AGENT)) + ); + Map node = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(pipelineCreateResponse.getEntity()), + false + ); + assertEquals("true", node.get("acknowledged").toString()); + } + protected void createSearchRequestProcessor(final String modelId, final String pipelineName) throws Exception { Response pipelineCreateResponse = makeRequest( client(),