Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Treat . as a nested field in field_map of text embedding processor #488

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.neuralsearch.processor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -27,8 +28,10 @@
import com.google.common.collect.ImmutableMap;

/**
* The abstract class for text processing use cases. Users provide a field name map and a model id.
* During ingestion, the processor will use the corresponding model to inference the input texts,
* The abstract class for text processing use cases. Users provide a field name
* map and a model id.
* During ingestion, the processor will use the corresponding model to inference
* the input texts,
* and set the target fields according to the field name map.
*/
@Log4j2
Expand All @@ -39,7 +42,8 @@ public abstract class InferenceProcessor extends AbstractProcessor {

private final String type;

// This field is used for nested knn_vector/rank_features field. The value of the field will be used as the
// This field is used for nested knn_vector/rank_features field. The value of
// the field will be used as the
// default key for the nested object.
private final String listTypeNestedMapKey;

Expand All @@ -52,18 +56,18 @@ public abstract class InferenceProcessor extends AbstractProcessor {
private final Environment environment;

public InferenceProcessor(
String tag,
String description,
String type,
String listTypeNestedMapKey,
String modelId,
Map<String, Object> fieldMap,
MLCommonsClientAccessor clientAccessor,
Environment environment
) {
String tag,
String description,
String type,
String listTypeNestedMapKey,
String modelId,
Map<String, Object> fieldMap,
MLCommonsClientAccessor clientAccessor,
Environment environment) {
super(tag, description);
this.type = type;
if (StringUtils.isBlank(modelId)) throw new IllegalArgumentException("model_id is null or empty, cannot process it");
if (StringUtils.isBlank(modelId))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did the formatting change? did you run ./gradlew :spotlessApply prior?

throw new IllegalArgumentException("model_id is null or empty, cannot process it");
validateEmbeddingConfiguration(fieldMap);

this.listTypeNestedMapKey = listTypeNestedMapKey;
Expand All @@ -75,33 +79,38 @@ public InferenceProcessor(

private void validateEmbeddingConfiguration(Map<String, Object> fieldMap) {
if (fieldMap == null
|| fieldMap.size() == 0
|| fieldMap.entrySet()
.stream()
.anyMatch(
x -> StringUtils.isBlank(x.getKey()) || Objects.isNull(x.getValue()) || StringUtils.isBlank(x.getValue().toString())
)) {
|| fieldMap.size() == 0
|| fieldMap.entrySet()
.stream()
.anyMatch(
x -> StringUtils.startsWith(x.getKey(), ".") || StringUtils.endsWith(x.getKey(), ".")
|| Arrays.stream(x.getKey().split("\\.")).anyMatch(y -> StringUtils.isBlank(y))
|| Objects.isNull(x.getValue())
|| StringUtils.isBlank(x.getValue().toString()))) {
throw new IllegalArgumentException("Unable to create the processor as field_map has invalid key or value");
}
}

public abstract void doExecute(
IngestDocument ingestDocument,
Map<String, Object> ProcessMap,
List<String> inferenceList,
BiConsumer<IngestDocument, Exception> handler
);
IngestDocument ingestDocument,
Map<String, Object> ProcessMap,
List<String> inferenceList,
BiConsumer<IngestDocument, Exception> handler);

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
return ingestDocument;
}

/**
* This method will be invoked by PipelineService to make async inference and then delegate the handler to
* This method will be invoked by PipelineService to make async inference and
* then delegate the handler to
* process the inference response or failure.
* @param ingestDocument {@link IngestDocument} which is the document passed to processor.
* @param handler {@link BiConsumer} which is the handler which can be used after the inference task is done.
*
* @param ingestDocument {@link IngestDocument} which is the document passed to
* processor.
* @param handler {@link BiConsumer} which is the handler which can be
* used after the inference task is done.
*/
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
Expand Down Expand Up @@ -142,7 +151,8 @@ private void createInferenceListForMapTypeInput(Object sourceValue, List<String>
} else if (sourceValue instanceof List) {
texts.addAll(((List<String>) sourceValue));
} else {
if (sourceValue == null) return;
if (sourceValue == null)
return;
texts.add(sourceValue.toString());
}
}
Expand All @@ -154,9 +164,20 @@ Map<String, Object> buildMapWithProcessorKeyAndOriginalValue(IngestDocument inge
for (Map.Entry<String, Object> fieldMapEntry : fieldMap.entrySet()) {
String originalKey = fieldMapEntry.getKey();
Object targetKey = fieldMapEntry.getValue();

int nestedDotIndex = originalKey.indexOf('.');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a user-provided info, can we add basic validation if it's not already done as part of the processor/pipeline definition.

if multiple levels of nested fields are needed this code may need a rework

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some basic validation done in validateEmbeddingConfiguration which is run on fieldMap in the constructor in this file. However, I will add some extra validation for nested fields.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use a static constant and avoid magic characters in code.
e.g.

private static final char FIELD_SEPARATOR = '.';

if (nestedDotIndex != -1) {
Map<String, Object> newTargetKey = new LinkedHashMap<>();
newTargetKey.put(originalKey.substring(nestedDotIndex + 1), targetKey);
targetKey = newTargetKey;

originalKey = originalKey.substring(0, nestedDotIndex);
}
Comment on lines +169 to +175
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Sanjana679 can you please provide details how we are handling multiple level of nesting here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, I'm not currently handling multiple levels of nesting, as I initially thought it was only for one level. However, I will work on handling multiple levels of nesting.


if (targetKey instanceof Map) {
Map<String, Object> treeRes = new LinkedHashMap<>();
buildMapWithProcessorKeyAndOriginalValueForMapType(originalKey, targetKey, sourceAndMetadataMap, treeRes);
buildMapWithProcessorKeyAndOriginalValueForMapType(originalKey, targetKey, sourceAndMetadataMap,
treeRes);
mapWithProcessorKeys.put(originalKey, treeRes.get(originalKey));
} else {
mapWithProcessorKeys.put(String.valueOf(targetKey), sourceAndMetadataMap.get(originalKey));
Expand All @@ -166,21 +187,20 @@ Map<String, Object> buildMapWithProcessorKeyAndOriginalValue(IngestDocument inge
}

private void buildMapWithProcessorKeyAndOriginalValueForMapType(
String parentKey,
Object processorKey,
Map<String, Object> sourceAndMetadataMap,
Map<String, Object> treeRes
) {
if (processorKey == null || sourceAndMetadataMap == null) return;
String parentKey,
Object processorKey,
Map<String, Object> sourceAndMetadataMap,
Map<String, Object> treeRes) {
Comment on lines +190 to +193
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove these indents.

if (processorKey == null || sourceAndMetadataMap == null)
return;
if (processorKey instanceof Map) {
Map<String, Object> next = new LinkedHashMap<>();
for (Map.Entry<String, Object> nestedFieldMapEntry : ((Map<String, Object>) processorKey).entrySet()) {
buildMapWithProcessorKeyAndOriginalValueForMapType(
nestedFieldMapEntry.getKey(),
nestedFieldMapEntry.getValue(),
(Map<String, Object>) sourceAndMetadataMap.get(parentKey),
next
);
nestedFieldMapEntry.getKey(),
nestedFieldMapEntry.getValue(),
(Map<String, Object>) sourceAndMetadataMap.get(parentKey),
next);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep ) consistent

}
treeRes.put(parentKey, next);
} else {
Expand All @@ -199,9 +219,11 @@ private void validateEmbeddingFieldsValue(IngestDocument ingestDocument) {
if (List.class.isAssignableFrom(sourceValueClass) || Map.class.isAssignableFrom(sourceValueClass)) {
validateNestedTypeValue(sourceKey, sourceValue, () -> 1);
} else if (!String.class.isAssignableFrom(sourceValueClass)) {
throw new IllegalArgumentException("field [" + sourceKey + "] is neither string nor nested type, cannot process it");
throw new IllegalArgumentException(
"field [" + sourceKey + "] is neither string nor nested type, cannot process it");
} else if (StringUtils.isBlank(sourceValue.toString())) {
throw new IllegalArgumentException("field [" + sourceKey + "] has empty string value, cannot process it");
throw new IllegalArgumentException(
"field [" + sourceKey + "] has empty string value, cannot process it");
}
}
}
Expand All @@ -211,18 +233,21 @@ private void validateEmbeddingFieldsValue(IngestDocument ingestDocument) {
private void validateNestedTypeValue(String sourceKey, Object sourceValue, Supplier<Integer> maxDepthSupplier) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace Supplier<Integer> with a simple int?

int maxDepth = maxDepthSupplier.get();
if (maxDepth > MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING.get(environment.settings())) {
throw new IllegalArgumentException("map type field [" + sourceKey + "] reached max depth limit, cannot process it");
throw new IllegalArgumentException(
"map type field [" + sourceKey + "] reached max depth limit, cannot process it");
} else if ((List.class.isAssignableFrom(sourceValue.getClass()))) {
validateListTypeValue(sourceKey, sourceValue);
} else if (Map.class.isAssignableFrom(sourceValue.getClass())) {
((Map) sourceValue).values()
.stream()
.filter(Objects::nonNull)
.forEach(x -> validateNestedTypeValue(sourceKey, x, () -> maxDepth + 1));
.stream()
.filter(Objects::nonNull)
.forEach(x -> validateNestedTypeValue(sourceKey, x, () -> maxDepth + 1));
Comment on lines +242 to +244
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these indents

} else if (!String.class.isAssignableFrom(sourceValue.getClass())) {
throw new IllegalArgumentException("map type field [" + sourceKey + "] has non-string type, cannot process it");
throw new IllegalArgumentException(
"map type field [" + sourceKey + "] has non-string type, cannot process it");
} else if (StringUtils.isBlank(sourceValue.toString())) {
throw new IllegalArgumentException("map type field [" + sourceKey + "] has empty string, cannot process it");
throw new IllegalArgumentException(
"map type field [" + sourceKey + "] has empty string, cannot process it");
}
}

Expand All @@ -232,14 +257,17 @@ private void validateListTypeValue(String sourceKey, Object sourceValue) {
if (value == null) {
throw new IllegalArgumentException("list type field [" + sourceKey + "] has null, cannot process it");
} else if (!(value instanceof String)) {
throw new IllegalArgumentException("list type field [" + sourceKey + "] has non string value, cannot process it");
throw new IllegalArgumentException(
"list type field [" + sourceKey + "] has non string value, cannot process it");
} else if (StringUtils.isBlank(value.toString())) {
throw new IllegalArgumentException("list type field [" + sourceKey + "] has empty string, cannot process it");
throw new IllegalArgumentException(
"list type field [" + sourceKey + "] has empty string, cannot process it");
}
}
}

protected void setVectorFieldsToDocument(IngestDocument ingestDocument, Map<String, Object> processorMap, List<?> results) {
protected void setVectorFieldsToDocument(IngestDocument ingestDocument, Map<String, Object> processorMap,
List<?> results) {
Objects.requireNonNull(results, "embedding failed, inference returns null result!");
log.debug("Model inference result fetched, starting build vector output!");
Map<String, Object> nlpResult = buildNLPResult(processorMap, results, ingestDocument.getSourceAndMetadata());
Expand All @@ -248,7 +276,8 @@ protected void setVectorFieldsToDocument(IngestDocument ingestDocument, Map<Stri

@SuppressWarnings({ "unchecked" })
@VisibleForTesting
Map<String, Object> buildNLPResult(Map<String, Object> processorMap, List<?> results, Map<String, Object> sourceAndMetadataMap) {
Map<String, Object> buildNLPResult(Map<String, Object> processorMap, List<?> results,
Map<String, Object> sourceAndMetadataMap) {
IndexWrapper indexWrapper = new IndexWrapper(0);
Map<String, Object> result = new LinkedHashMap<>();
for (Map.Entry<String, Object> knnMapEntry : processorMap.entrySet()) {
Expand All @@ -267,34 +296,36 @@ Map<String, Object> buildNLPResult(Map<String, Object> processorMap, List<?> res

@SuppressWarnings({ "unchecked" })
private void putNLPResultToSourceMapForMapType(
String processorKey,
Object sourceValue,
List<?> results,
IndexWrapper indexWrapper,
Map<String, Object> sourceAndMetadataMap
) {
if (processorKey == null || sourceAndMetadataMap == null || sourceValue == null) return;
String processorKey,
Object sourceValue,
List<?> results,
IndexWrapper indexWrapper,
Map<String, Object> sourceAndMetadataMap) {
if (processorKey == null || sourceAndMetadataMap == null || sourceValue == null)
return;
if (sourceValue instanceof Map) {
for (Map.Entry<String, Object> inputNestedMapEntry : ((Map<String, Object>) sourceValue).entrySet()) {
putNLPResultToSourceMapForMapType(
inputNestedMapEntry.getKey(),
inputNestedMapEntry.getValue(),
results,
indexWrapper,
(Map<String, Object>) sourceAndMetadataMap.get(processorKey)
);
inputNestedMapEntry.getKey(),
inputNestedMapEntry.getValue(),
results,
indexWrapper,
(Map<String, Object>) sourceAndMetadataMap.get(processorKey));
}
} else if (sourceValue instanceof String) {
sourceAndMetadataMap.put(processorKey, results.get(indexWrapper.index++));
} else if (sourceValue instanceof List) {
sourceAndMetadataMap.put(processorKey, buildNLPResultForListType((List<String>) sourceValue, results, indexWrapper));
sourceAndMetadataMap.put(processorKey,
buildNLPResultForListType((List<String>) sourceValue, results, indexWrapper));
}
}

private List<Map<String, Object>> buildNLPResultForListType(List<String> sourceValue, List<?> results, IndexWrapper indexWrapper) {
private List<Map<String, Object>> buildNLPResultForListType(List<String> sourceValue, List<?> results,
IndexWrapper indexWrapper) {
List<Map<String, Object>> keyToResult = new ArrayList<>();
IntStream.range(0, sourceValue.size())
.forEachOrdered(x -> keyToResult.add(ImmutableMap.of(listTypeNestedMapKey, results.get(indexWrapper.index++))));
.forEachOrdered(
x -> keyToResult.add(ImmutableMap.of(listTypeNestedMapKey, results.get(indexWrapper.index++))));
return keyToResult;
}

Expand All @@ -304,10 +335,14 @@ public String getType() {
}

/**
* Since we need to build a {@link List<String>} as the input for text embedding, and the result type is {@link List<Float>} of {@link List},
* we need to map the result back to the input one by one with exactly order. For nested map type input, we're performing a pre-order
* traversal to extract the input strings, so when mapping back to the nested map, we still need a pre-order traversal to ensure the
* order. And we also need to ensure the index pointer goes forward in the recursive, so here the IndexWrapper is to store and increase
* Since we need to build a {@link List<String>} as the input for text
* embedding, and the result type is {@link List<Float>} of {@link List},
* we need to map the result back to the input one by one with exactly order.
* For nested map type input, we're performing a pre-order
* traversal to extract the input strings, so when mapping back to the nested
* map, we still need a pre-order traversal to ensure the
* order. And we also need to ensure the index pointer goes forward in the
* recursive, so here the IndexWrapper is to store and increase
* the index pointer during the recursive.
* index: the index pointer of the text embedding result.
*/
Expand Down
Loading