Skip to content

Commit

Permalink
Reworked feature flag usage, add logs (#246)
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Gaievski <[email protected]>
  • Loading branch information
martin-gaievski committed Aug 8, 2023
1 parent b9c86a8 commit 5819df6
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 25 deletions.
4 changes: 1 addition & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,7 @@ testClusters.integTest {

// enable features for testing
// hybrid search
systemProperty('neural_search_hybrid_search_enabled', 'true')
// search pipelines
systemProperty('opensearch.experimental.feature.search_pipeline.enabled', 'true')
systemProperty('plugins.ml_commons.hybrid_search_enabled', 'true')
}

// Remote Integration Tests
Expand Down
26 changes: 14 additions & 12 deletions src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.neuralsearch.plugin;

import static org.opensearch.neuralsearch.settings.NeuralSearchSettings.NEURAL_SEARCH_HYBRID_SEARCH_ENABLED;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -18,6 +20,7 @@
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -51,20 +54,11 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import com.google.common.annotations.VisibleForTesting;

/**
* Neural Search plugin class
*/
@Log4j2
public class NeuralSearch extends Plugin implements ActionPlugin, SearchPlugin, IngestPlugin, ExtensiblePlugin, SearchPipelinePlugin {
/**
* Gates the functionality of hybrid search
* Currently query phase searcher added with hybrid search will conflict with concurrent search in core.
* Once that problem is resolved this feature flag can be removed.
*/
@VisibleForTesting
public static final String NEURAL_SEARCH_HYBRID_SEARCH_ENABLED = "neural_search_hybrid_search_enabled";
private MLCommonsClientAccessor clientAccessor;
private NormalizationProcessorWorkflow normalizationProcessorWorkflow;
private final ScoreNormalizationFactory scoreNormalizationFactory = new ScoreNormalizationFactory();
Expand Down Expand Up @@ -105,11 +99,14 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet

@Override
public Optional<QueryPhaseSearcher> getQueryPhaseSearcher() {
if (FeatureFlags.isEnabled(NEURAL_SEARCH_HYBRID_SEARCH_ENABLED)) {
log.info("Registering hybrid query phase searcher with feature flag [%]", NEURAL_SEARCH_HYBRID_SEARCH_ENABLED);
if (FeatureFlags.isEnabled(NEURAL_SEARCH_HYBRID_SEARCH_ENABLED.getKey())) {
log.info("Registering hybrid query phase searcher with feature flag [{}]", NEURAL_SEARCH_HYBRID_SEARCH_ENABLED.getKey());
return Optional.of(new HybridQueryPhaseSearcher());
}
log.info("Not registering hybrid query phase searcher because feature flag [%] is disabled", NEURAL_SEARCH_HYBRID_SEARCH_ENABLED);
log.info(
"Not registering hybrid query phase searcher because feature flag [{}] is disabled",
NEURAL_SEARCH_HYBRID_SEARCH_ENABLED.getKey()
);
// we want feature be disabled by default due to risk of colliding and breaking concurrent search in core
return Optional.empty();
}
Expand All @@ -123,4 +120,9 @@ public Map<String, org.opensearch.search.pipeline.Processor.Factory<SearchPhaseR
new NormalizationProcessorFactory(normalizationProcessorWorkflow, scoreNormalizationFactory, scoreCombinationFactory)
);
}

@Override
public List<Setting<?>> getSettings() {
return List.of(NEURAL_SEARCH_HYBRID_SEARCH_ENABLED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public <Result extends SearchPhaseResult> void process(
final SearchPhaseResults<Result> searchPhaseResult,
final SearchPhaseContext searchPhaseContext
) {
if (shouldRunProcessor(searchPhaseResult)) {
if (shouldSkipProcessor(searchPhaseResult)) {
log.debug("Query results are not compatible with normalization processor");
return;
}
List<QuerySearchResult> querySearchResults = getQueryPhaseSearchResults(searchPhaseResult);
Expand Down Expand Up @@ -88,7 +89,7 @@ public boolean isIgnoreFailure() {
return false;
}

private <Result extends SearchPhaseResult> boolean shouldRunProcessor(SearchPhaseResults<Result> searchPhaseResult) {
private <Result extends SearchPhaseResult> boolean shouldSkipProcessor(SearchPhaseResults<Result> searchPhaseResult) {
if (Objects.isNull(searchPhaseResult) || !(searchPhaseResult instanceof QueryPhaseResultConsumer)) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.stream.Collectors;

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;

import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique;
Expand All @@ -24,6 +25,7 @@
* and post-processing of final results
*/
@AllArgsConstructor
@Log4j2
public class NormalizationProcessorWorkflow {

private final ScoreNormalizer scoreNormalizer;
Expand All @@ -41,15 +43,19 @@ public void execute(
final ScoreCombinationTechnique combinationTechnique
) {
// pre-process data
log.debug("Pre-process query results");
List<CompoundTopDocs> queryTopDocs = getQueryTopDocs(querySearchResults);

// normalize
log.debug("Do score normalization");
scoreNormalizer.normalizeScores(queryTopDocs, normalizationTechnique);

// combine
log.debug("Do score combination");
scoreCombiner.combineScores(queryTopDocs, combinationTechnique);

// post-process data
log.debug("Post-process query results after score normalization and combination");
updateOriginalQueryResults(querySearchResults, queryTopDocs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
import java.util.Map;
import java.util.Set;

import lombok.ToString;

/**
* Abstracts combination of scores based on arithmetic mean method
*/
@ToString(onlyExplicitlyIncluded = true)
public class ArithmeticMeanScoreCombinationTechnique implements ScoreCombinationTechnique {

@ToString.Include
public static final String TECHNIQUE_NAME = "arithmetic_mean";
public static final String PARAM_NAME_WEIGHTS = "weights";
private static final Set<String> SUPPORTED_PARAMS = Set.of(PARAM_NAME_WEIGHTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
import java.util.Map;
import java.util.Set;

import lombok.ToString;

/**
* Abstracts combination of scores based on geometrical mean method
*/
@ToString(onlyExplicitlyIncluded = true)
public class GeometricMeanScoreCombinationTechnique implements ScoreCombinationTechnique {

@ToString.Include
public static final String TECHNIQUE_NAME = "geometric_mean";
public static final String PARAM_NAME_WEIGHTS = "weights";
private static final Set<String> SUPPORTED_PARAMS = Set.of(PARAM_NAME_WEIGHTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
import java.util.Map;
import java.util.Set;

import lombok.ToString;

/**
* Abstracts combination of scores based on harmonic mean method
*/
@ToString(onlyExplicitlyIncluded = true)
public class HarmonicMeanScoreCombinationTechnique implements ScoreCombinationTechnique {

@ToString.Include
public static final String TECHNIQUE_NAME = "harmonic_mean";
public static final String PARAM_NAME_WEIGHTS = "weights";
private static final Set<String> SUPPORTED_PARAMS = Set.of(PARAM_NAME_WEIGHTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Objects;

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;

import org.opensearch.neuralsearch.processor.NormalizationProcessor;
import org.opensearch.neuralsearch.processor.NormalizationProcessorWorkflow;
Expand All @@ -28,6 +29,7 @@
* Factory for query results normalization processor for search pipeline. Instantiates processor based on user provided input.
*/
@AllArgsConstructor
@Log4j2
public class NormalizationProcessorFactory implements Processor.Factory<SearchPhaseResultsProcessor> {
public static final String NORMALIZATION_CLAUSE = "normalization";
public static final String COMBINATION_CLAUSE = "combination";
Expand Down Expand Up @@ -75,7 +77,12 @@ public SearchPhaseResultsProcessor create(
Map<String, Object> combinationParams = readOptionalMap(NormalizationProcessor.TYPE, tag, combinationClause, PARAMETERS);
scoreCombinationTechnique = scoreCombinationFactory.createCombination(combinationTechnique, combinationParams);
}

log.info(
"Creating search phase results processor of type [{}] with normalization [{}] and combination [{}]",
NormalizationProcessor.TYPE,
normalizationTechnique,
scoreCombinationTechnique
);
return new NormalizationProcessor(
tag,
description,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
import java.util.List;
import java.util.Objects;

import lombok.ToString;

import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.opensearch.neuralsearch.search.CompoundTopDocs;

/**
* Abstracts normalization of scores based on L2 method
*/
@ToString(onlyExplicitlyIncluded = true)
public class L2ScoreNormalizationTechnique implements ScoreNormalizationTechnique {

@ToString.Include
public static final String TECHNIQUE_NAME = "l2";
private static final float MIN_SCORE = 0.001f;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.List;
import java.util.Objects;

import lombok.ToString;

import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.opensearch.neuralsearch.search.CompoundTopDocs;
Expand All @@ -18,8 +20,9 @@
/**
* Abstracts normalization of scores based on min-max method
*/
@ToString(onlyExplicitlyIncluded = true)
public class MinMaxScoreNormalizationTechnique implements ScoreNormalizationTechnique {

@ToString.Include
public static final String TECHNIQUE_NAME = "min_max";
private static final float MIN_SCORE = 0.001f;
private static final float SINGLE_RESULT_SCORE = 1.0f;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.neuralsearch.settings;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import org.opensearch.common.settings.Setting;

/**
* Class defines settings specific to neural-search plugin
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class NeuralSearchSettings {

/**
* Gates the functionality of hybrid search
* Currently query phase searcher added with hybrid search will conflict with concurrent search in core.
* Once that problem is resolved this feature flag can be removed.
*/
public static final Setting<Boolean> NEURAL_SEARCH_HYBRID_SEARCH_ENABLED = Setting.boolSetting(
"plugins.ml_commons.hybrid_search_enabled",
false,
Setting.Property.NodeScope
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.opensearch.neuralsearch.plugin.NeuralSearch.NEURAL_SEARCH_HYBRID_SEARCH_ENABLED;
import static org.opensearch.neuralsearch.settings.NeuralSearchSettings.NEURAL_SEARCH_HYBRID_SEARCH_ENABLED;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -227,6 +227,6 @@ public float getMaxScore(int upTo) {

@SuppressForbidden(reason = "manipulates system properties for testing")
protected static void initFeatureFlags() {
System.setProperty(NEURAL_SEARCH_HYBRID_SEARCH_ENABLED, "true");
System.setProperty(NEURAL_SEARCH_HYBRID_SEARCH_ENABLED.getKey(), "true");
}
}

0 comments on commit 5819df6

Please sign in to comment.