Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Translate Plugin - Added functionality to provide full path to the source field #3050

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.translate;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* JsonExtractor class is a utility for handling JSON paths and extracting specific fields or objects from
* JSON structures represented as Java objects. It provides a way to work with nested JSON structures and
* retrieve the relevant data based on the provided paths.
*/
public class JsonExtractor {

/**
* Default delimiter between the fields when providing the path
*/
private static final String DELIMITER = "/";

/**
* @param fullPath full path to the leaf field
* @return the first field from the full path, returns empty string "" if the path is empty
*/
public String getRootField(String fullPath) {
final List<String> fieldsInPath = getFieldsInPath(fullPath);
return fieldsInPath.isEmpty() ? "" : fieldsInPath.get(0);
}

/**
* @param fullPath full path to the leaf field
* @return the last field from the full path, returns empty string "" if the path is empty
*/
public String getLeafField(String fullPath) {
String strippedPath = getStrippedPath(fullPath);
final String[] fields = strippedPath.split(DELIMITER);
return fields.length==0 ? "" : fields[fields.length - 1].strip();
}

/**
* @param fullPath full path to the leaf field
* @return the path leading up to the lead field, returns empty string "" if there is no parent path
*/
public String getParentPath(String fullPath) {
String strippedPath = getStrippedPath(fullPath);
final String[] fields = strippedPath.split(DELIMITER);
if (fields.length <= 1) {
return "";
}
return Arrays.stream(fields, 0, fields.length - 1).collect(Collectors.joining(DELIMITER));
}

/**
* @param path full path to the leaf field
* @param rootObject Java Object in which root field is located. Can be either a List or Map
* @return all the Java Objects that are associated with the provided path .
* Path : field1/field2/field3 gives the value of field3.
*/
public List<Object> getObjectFromPath(String path, Object rootObject) {
final List<String> fieldsInPath = getFieldsInPath(path);
if (fieldsInPath.isEmpty()) {
return List.of(rootObject);
}
return getLeafObjects(fieldsInPath, 0, rootObject);
}

/**
* @param path path from one field to another
* @return the list of fields in the provided path
*/
private List<String> getFieldsInPath(String path) {
String strippedPath = getStrippedPath(path);
if (strippedPath.isEmpty()) {
return List.of();
}
return new ArrayList<>(Arrays.asList(strippedPath.split(DELIMITER)));
}

/**
* @param fieldsInPath list of fields in a path
* @param level current level inside the nested object with reference to the root level
* @param rootObject Java Object in which root field is located. Can be either a List or Map
* @return all the Java Objects that satisfy the fields hierarchy in fieldsInPath
*/
private List<Object> getLeafObjects(List<String> fieldsInPath, int level, Object rootObject) {
if (Objects.isNull(rootObject)) {
return List.of();
}

if (rootObject instanceof List) {
return ((List<?>) rootObject).stream()
.flatMap(arrayObject -> getLeafObjects(fieldsInPath, level, arrayObject).stream())
.collect(Collectors.toList());
} else if (rootObject instanceof Map) {
if (level >= fieldsInPath.size()) {
return List.of(rootObject);
} else {
String field = fieldsInPath.get(level);
Object outObj = ((Map<?, ?>) rootObject).get(field);
return getLeafObjects(fieldsInPath, level + 1, outObj);
}
}
return List.of();
}

/**
* @param path path from one field to another
* @return path stripped of whitespaces
*/
private String getStrippedPath(String path){
checkNotNull(path, "path cannot be null");
return path.strip();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ public class MappingsParameterConfig {
@NotNull
private Object source;

@JsonProperty("iterate_on")
private String iterateOn;

@JsonProperty("targets")
@Valid
private List<TargetsParameterConfig> targetsParameterConfigs = new ArrayList<>();
Expand All @@ -26,10 +23,6 @@ public Object getSource() {
return source;
}

public String getIterateOn() {
return iterateOn;
}

public List<TargetsParameterConfig> getTargetsParameterConfigs() {
return targetsParameterConfigs;
}
Expand All @@ -44,7 +37,7 @@ public boolean isTargetsPresent(){
return Objects.nonNull(targetsParameterConfigs) && !targetsParameterConfigs.isEmpty();
}

@AssertTrue(message = "source field must be a string or list of strings")
@AssertTrue(message = "The \"source\" field should either be a string or a list of strings sharing the same parent path.")
public boolean isSourceFieldValid() {
if(Objects.isNull(source)){
return true;
Expand All @@ -54,11 +47,29 @@ public boolean isSourceFieldValid() {
}
if (source instanceof List<?>) {
List<?> sourceList = (List<?>) source;
return sourceList.stream().allMatch(sourceItem -> sourceItem instanceof String);
if(sourceList.isEmpty()){
return false;
}
return sourceList.stream().allMatch(sourceItem -> sourceItem instanceof String)
&& commonRootPath(sourceList);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason all the source keys have to have a common root path? If not we should allow different source keys. Can be in future PR

}
return false;
}

public boolean commonRootPath(List<?> sourceList){
List<String> sources = (List<String>) sourceList;

JsonExtractor jsonExtractor = new JsonExtractor();
String firstSource = sources.get(0);
String parentPath = jsonExtractor.getParentPath(firstSource);
for (String source : sources) {
if (!jsonExtractor.getParentPath(source).equals(parentPath)) {
return false;
}
}
return true;
}

public void parseMappings(){
if(Objects.isNull(targetsParameterConfigs)){
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
Expand Down Expand Up @@ -39,6 +40,8 @@ public class TranslateProcessor extends AbstractProcessor<Record<Event>, Record<
private static final Logger LOG = LoggerFactory.getLogger(TranslateProcessor.class);
private final ExpressionEvaluator expressionEvaluator;
private final List<MappingsParameterConfig> mappingsConfig;
private final JacksonEvent.Builder eventBuilder= JacksonEvent.builder();
private final JsonExtractor jsonExtractor = new JsonExtractor();

@DataPrepperPluginConstructor
public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorConfig translateProcessorConfig, final ExpressionEvaluator expressionEvaluator) {
Expand All @@ -58,21 +61,10 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
final Event recordEvent = record.getData();
for (MappingsParameterConfig mappingConfig : mappingsConfig) {
try {
String iterateOn = mappingConfig.getIterateOn();
List<TargetsParameterConfig> targetsConfig = mappingConfig.getTargetsParameterConfigs();
for (TargetsParameterConfig targetConfig : targetsConfig) {
String translateWhen = targetConfig.getTranslateWhen();
Object sourceObject = mappingConfig.getSource();
if (Objects.nonNull(translateWhen) && !expressionEvaluator.evaluateConditional(translateWhen, recordEvent)) {
continue;
}
if (Objects.nonNull(iterateOn)) {
List<Map<String, Object>> objectsToIterate = recordEvent.get(iterateOn, List.class);
objectsToIterate.forEach(recordObject -> performMappings(recordObject, sourceObject, targetConfig));
recordEvent.put(iterateOn, objectsToIterate);
} else {
performMappings(recordEvent, sourceObject, targetConfig);
}
translateSource(sourceObject, recordEvent, targetConfig);
}
} catch (Exception ex) {
LOG.error(EVENT, "Error mapping the source [{}] of entry [{}]", mappingConfig.getSource(),
Expand All @@ -83,12 +75,57 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
return records;
}

private List<String> getSourceKeys(Object sourceObject){
List<String> sourceKeys;
if (sourceObject instanceof List<?>) {
sourceKeys = (ArrayList<String>) sourceObject;
} else if (sourceObject instanceof String) {
sourceKeys = List.of((String) sourceObject);
} else {
String exceptionMsg = "source option configured incorrectly. source can only be a String or list of Strings";
throw new InvalidPluginConfigurationException(exceptionMsg);
}
return sourceKeys;
}

private void translateSource(Object sourceObject, Event recordEvent, TargetsParameterConfig targetConfig) {
Map<String, Object> recordObject = recordEvent.toMap();
List<String> sourceKeysPaths = getSourceKeys(sourceObject);
if(Objects.isNull(recordObject) || sourceKeysPaths.isEmpty()){
return;
}

List<String> sourceKeys = new ArrayList<>();
for(String sourceKeyPath: sourceKeysPaths){
sourceKeys.add(jsonExtractor.getLeafField(sourceKeyPath));
}

String commonPath = jsonExtractor.getParentPath(sourceKeysPaths.get(0));
if(commonPath.isEmpty()) {
performMappings(recordEvent, sourceKeys, sourceObject, targetConfig);
return;
}

String rootField = jsonExtractor.getRootField(commonPath);
if(!recordObject.containsKey(rootField)){
return;
}

List<Object> targetObjects = jsonExtractor.getObjectFromPath(commonPath, recordObject);
if(!targetObjects.isEmpty()) {
targetObjects.forEach(targetObj -> performMappings(targetObj, sourceKeys, sourceObject, targetConfig));
recordEvent.put(rootField, recordObject.get(rootField));
}
}

private String getSourceValue(Object recordObject, String sourceKey) {
Optional<Object> sourceValue;
if (recordObject instanceof Map) {
return (String) ((Map<?, ?>) recordObject).get(sourceKey);
sourceValue = Optional.ofNullable(((Map<?, ?>) recordObject).get(sourceKey));
} else {
return ((Event) recordObject).get(sourceKey, String.class);
sourceValue = Optional.ofNullable(((Event) recordObject).get(sourceKey, String.class));
}
return sourceValue.map(Object::toString).orElse(null);
}

private Object getTargetValue(Object sourceObject, List<Object> targetValues, TargetsParameterConfig targetConfig) {
Expand All @@ -102,17 +139,18 @@ private Object getTargetValue(Object sourceObject, List<Object> targetValues, Ta
.collect(Collectors.toList());
}

private void performMappings(Object recordObject, Object sourceObject, TargetsParameterConfig targetConfig) {
List<Object> targetValues = new ArrayList<>();
List<String> sourceKeys;
if (sourceObject instanceof List<?>) {
sourceKeys = (ArrayList<String>) sourceObject;
} else if (sourceObject instanceof String) {
sourceKeys = List.of((String) sourceObject);
} else {
String exceptionMsg = "source option configured incorrectly. source can only be a String or list of Strings";
throw new InvalidPluginConfigurationException(exceptionMsg);
private void performMappings(Object recordObject, List<String> sourceKeys, Object sourceObject, TargetsParameterConfig targetConfig) {
if (Objects.isNull(recordObject) ||
Objects.isNull(sourceObject) ||
Objects.isNull(targetConfig) ||
sourceKeys.isEmpty()) {
return;
}
String translateWhen = targetConfig.getTranslateWhen();
if(!isExpressionValid(translateWhen, recordObject)){
return;
}
List<Object> targetValues = new ArrayList<>();
for (String sourceKey : sourceKeys) {
String sourceValue = getSourceValue(recordObject, sourceKey);
if(sourceValue!=null){
Expand All @@ -123,6 +161,16 @@ private void performMappings(Object recordObject, Object sourceObject, TargetsPa
addTargetToRecords(sourceObject, targetValues, recordObject, targetConfig);
}

private boolean isExpressionValid(String translateWhen, Object recordObject){
Event recordEvent;
if (recordObject instanceof Map) {
recordEvent = eventBuilder.withData(recordObject).withEventType("event").build();
} else {
recordEvent = (Event)recordObject;
}
return (translateWhen == null) || expressionEvaluator.evaluateConditional(translateWhen, recordEvent);
}

private Optional<Object> getTargetValueForSource(final String sourceValue, TargetsParameterConfig targetConfig) {
Optional<Object> targetValue = Optional.empty();
targetValue = targetValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import static org.hamcrest.core.Is.is;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -21,13 +20,6 @@ void setup() throws NoSuchFieldException, IllegalAccessException{
setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", "sourceKey");
}

@Test
void test_get_iterate_on() throws NoSuchFieldException, IllegalAccessException{
assertNull(mappingsParameterConfig.getIterateOn());
setField(MappingsParameterConfig.class, mappingsParameterConfig, "iterateOn", "iteratorField");
assertThat(mappingsParameterConfig.getIterateOn(),is("iteratorField"));
}

@Test
void test_get_source() {
assertThat(mappingsParameterConfig.getSource(),is("sourceKey"));
Expand Down Expand Up @@ -65,4 +57,31 @@ void test_source_field_invalid_types() throws NoSuchFieldException, IllegalAcces
assertFalse(mappingsParameterConfig.isSourceFieldValid());
}

@Test
void test_valid_source_array() throws NoSuchFieldException, IllegalAccessException {
List<String> sourceList = List.of("sourceField1", "sourceField2");
setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", sourceList);
assertTrue(mappingsParameterConfig.isSourceFieldValid());
}

@Test
void test_invalid_source_array_not_string_type() throws NoSuchFieldException, IllegalAccessException {
List<Object> sourceList = List.of("sourceField1", 1);
setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", sourceList);
assertFalse(mappingsParameterConfig.isSourceFieldValid());
}

@Test
void test_valid_source_array_valid_common_path() throws NoSuchFieldException, IllegalAccessException {
List<String> sourceList = List.of("field1/field2/sourceField1", "field1/field2/sourceField2");
setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", sourceList);
assertTrue(mappingsParameterConfig.isSourceFieldValid());
}
@Test
void test_invalid_source_array_invalid_common_path() throws NoSuchFieldException, IllegalAccessException {
List<String> sourceList = List.of("field1/field2/sourceField1", "field1/sourceField2");
setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", sourceList);
assertFalse(mappingsParameterConfig.isSourceFieldValid());
}

}
Loading
Loading