Skip to content

Commit

Permalink
Removed IterateOn otpion (#3050)
Browse files Browse the repository at this point in the history
Signed-off-by: Vishal Boinapalli <[email protected]>
  • Loading branch information
vishalboin committed Jul 28, 2023
1 parent 8c9c9ab commit 385dc33
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.translate;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* JsonExtractor class is a utility for handling JSON paths and extracting specific fields or objects from
* JSON structures represented as Java objects. It provides a way to work with nested JSON structures and
* retrieve the relevant data based on the provided paths.
*/
public class JsonExtractor {

/**
* Default delimiter between the fields when providing the path
*/
private static final String DELIMITER = "/";

/**
* @param fullPath full path to the leaf field
* @return the first field from the full path, returns empty string "" if the path is empty
*/
public String getRootField(String fullPath) {
final List<String> fieldsInPath = getFieldsInPath(fullPath);
return fieldsInPath.isEmpty() ? "" : fieldsInPath.get(0);
}

/**
* @param fullPath full path to the leaf field
* @return the last field from the full path, returns empty string "" if the path is empty
*/
public String getLeafField(String fullPath) {
String strippedPath = getStrippedPath(fullPath);
final String[] fields = strippedPath.split(DELIMITER);
return fields.length==0 ? "" : fields[fields.length - 1].strip();
}

/**
* @param fullPath full path to the leaf field
* @return the path leading up to the lead field, returns empty string "" if there is no parent path
*/
public String getParentPath(String fullPath) {
String strippedPath = getStrippedPath(fullPath);
final String[] fields = strippedPath.split(DELIMITER);
if (fields.length <= 1) {
return "";
}
return Arrays.stream(fields, 0, fields.length - 1).collect(Collectors.joining(DELIMITER));
}

/**
* @param path full path to the leaf field
* @param rootObject Java Object in which root field is located. Can be either a List or Map
* @return all the Java Objects that are associated with the provided path .
* Path : field1/field2/field3 gives the value of field3.
*/
public List<Object> getObjectFromPath(String path, Object rootObject) {
final List<String> fieldsInPath = getFieldsInPath(path);
if (fieldsInPath.isEmpty()) {
return List.of(rootObject);
}
return getLeafObjects(fieldsInPath, 0, rootObject);
}

/**
* @param path path from one field to another
* @return the list of fields in the provided path
*/
private List<String> getFieldsInPath(String path) {
String strippedPath = getStrippedPath(path);
if (strippedPath.isEmpty()) {
return List.of();
}
return new ArrayList<>(Arrays.asList(strippedPath.split(DELIMITER)));
}

/**
* @param fieldsInPath list of fields in a path
* @param level current level inside the nested object with reference to the root level
* @param rootObject Java Object in which root field is located. Can be either a List or Map
* @return all the Java Objects that satisfy the fields hierarchy in fieldsInPath
*/
private List<Object> getLeafObjects(List<String> fieldsInPath, int level, Object rootObject) {
if (Objects.isNull(rootObject)) {
return List.of();
}

if (rootObject instanceof List) {
return ((List<?>) rootObject).stream()
.flatMap(arrayObject -> getLeafObjects(fieldsInPath, level, arrayObject).stream())
.collect(Collectors.toList());
} else if (rootObject instanceof Map) {
if (level >= fieldsInPath.size()) {
return List.of(rootObject);
} else {
String field = fieldsInPath.get(level);
Object outObj = ((Map<?, ?>) rootObject).get(field);
return getLeafObjects(fieldsInPath, level + 1, outObj);
}
}
return List.of();
}

/**
* @param path path from one field to another
* @return path stripped of whitespaces
*/
private String getStrippedPath(String path){
checkNotNull(path, "path cannot be null");
return path.strip();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ public class MappingsParameterConfig {
@NotNull
private Object source;

@JsonProperty("iterate_on")
private String iterateOn;

@JsonProperty("targets")
@Valid
private List<TargetsParameterConfig> targetsParameterConfigs = new ArrayList<>();
Expand All @@ -26,10 +23,6 @@ public Object getSource() {
return source;
}

public String getIterateOn() {
return iterateOn;
}

public List<TargetsParameterConfig> getTargetsParameterConfigs() {
return targetsParameterConfigs;
}
Expand All @@ -44,7 +37,7 @@ public boolean isTargetsPresent(){
return Objects.nonNull(targetsParameterConfigs) && !targetsParameterConfigs.isEmpty();
}

@AssertTrue(message = "source field must be a string or list of strings")
@AssertTrue(message = "The \"source\" field should either be a string or a list of strings sharing the same parent path.")
public boolean isSourceFieldValid() {
if(Objects.isNull(source)){
return true;
Expand All @@ -54,11 +47,29 @@ public boolean isSourceFieldValid() {
}
if (source instanceof List<?>) {
List<?> sourceList = (List<?>) source;
return sourceList.stream().allMatch(sourceItem -> sourceItem instanceof String);
if(sourceList.isEmpty()){
return false;
}
return sourceList.stream().allMatch(sourceItem -> sourceItem instanceof String)
&& commonRootPath(sourceList);
}
return false;
}

public boolean commonRootPath(List<?> sourceList){
List<String> sources = (List<String>) sourceList;

JsonExtractor jsonExtractor = new JsonExtractor();
String firstSource = sources.get(0);
String parentPath = jsonExtractor.getParentPath(firstSource);
for (String source : sources) {
if (!jsonExtractor.getParentPath(source).equals(parentPath)) {
return false;
}
}
return true;
}

public void parseMappings(){
if(Objects.isNull(targetsParameterConfigs)){
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
Expand Down Expand Up @@ -39,6 +40,8 @@ public class TranslateProcessor extends AbstractProcessor<Record<Event>, Record<
private static final Logger LOG = LoggerFactory.getLogger(TranslateProcessor.class);
private final ExpressionEvaluator expressionEvaluator;
private final List<MappingsParameterConfig> mappingsConfig;
private final JacksonEvent.Builder eventBuilder= JacksonEvent.builder();
private final JsonExtractor jsonExtractor = new JsonExtractor();

@DataPrepperPluginConstructor
public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorConfig translateProcessorConfig, final ExpressionEvaluator expressionEvaluator) {
Expand All @@ -58,21 +61,10 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
final Event recordEvent = record.getData();
for (MappingsParameterConfig mappingConfig : mappingsConfig) {
try {
String iterateOn = mappingConfig.getIterateOn();
List<TargetsParameterConfig> targetsConfig = mappingConfig.getTargetsParameterConfigs();
for (TargetsParameterConfig targetConfig : targetsConfig) {
String translateWhen = targetConfig.getTranslateWhen();
Object sourceObject = mappingConfig.getSource();
if (Objects.nonNull(translateWhen) && !expressionEvaluator.evaluateConditional(translateWhen, recordEvent)) {
continue;
}
if (Objects.nonNull(iterateOn)) {
List<Map<String, Object>> objectsToIterate = recordEvent.get(iterateOn, List.class);
objectsToIterate.forEach(recordObject -> performMappings(recordObject, sourceObject, targetConfig));
recordEvent.put(iterateOn, objectsToIterate);
} else {
performMappings(recordEvent, sourceObject, targetConfig);
}
translateSource(sourceObject, recordEvent, targetConfig);
}
} catch (Exception ex) {
LOG.error(EVENT, "Error mapping the source [{}] of entry [{}]", mappingConfig.getSource(),
Expand All @@ -83,12 +75,57 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
return records;
}

private List<String> getSourceKeys(Object sourceObject){
List<String> sourceKeys;
if (sourceObject instanceof List<?>) {
sourceKeys = (ArrayList<String>) sourceObject;
} else if (sourceObject instanceof String) {
sourceKeys = List.of((String) sourceObject);
} else {
String exceptionMsg = "source option configured incorrectly. source can only be a String or list of Strings";
throw new InvalidPluginConfigurationException(exceptionMsg);
}
return sourceKeys;
}

private void translateSource(Object sourceObject, Event recordEvent, TargetsParameterConfig targetConfig) {
Map<String, Object> recordObject = recordEvent.toMap();
List<String> sourceKeysPaths = getSourceKeys(sourceObject);
if(Objects.isNull(recordObject) || sourceKeysPaths.isEmpty()){
return;
}

List<String> sourceKeys = new ArrayList<>();
for(String sourceKeyPath: sourceKeysPaths){
sourceKeys.add(jsonExtractor.getLeafField(sourceKeyPath));
}

String commonPath = jsonExtractor.getParentPath(sourceKeysPaths.get(0));
if(commonPath.isEmpty()) {
performMappings(recordEvent, sourceKeys, sourceObject, targetConfig);
return;
}

String rootField = jsonExtractor.getRootField(commonPath);
if(!recordObject.containsKey(rootField)){
return;
}

List<Object> targetObjects = jsonExtractor.getObjectFromPath(commonPath, recordObject);
if(!targetObjects.isEmpty()) {
targetObjects.forEach(targetObj -> performMappings(targetObj, sourceKeys, sourceObject, targetConfig));
recordEvent.put(rootField, recordObject.get(rootField));
}
}

private String getSourceValue(Object recordObject, String sourceKey) {
Optional<Object> sourceValue;
if (recordObject instanceof Map) {
return (String) ((Map<?, ?>) recordObject).get(sourceKey);
sourceValue = Optional.ofNullable(((Map<?, ?>) recordObject).get(sourceKey));
} else {
return ((Event) recordObject).get(sourceKey, String.class);
sourceValue = Optional.ofNullable(((Event) recordObject).get(sourceKey, String.class));
}
return sourceValue.map(Object::toString).orElse(null);
}

private Object getTargetValue(Object sourceObject, List<Object> targetValues, TargetsParameterConfig targetConfig) {
Expand All @@ -102,17 +139,18 @@ private Object getTargetValue(Object sourceObject, List<Object> targetValues, Ta
.collect(Collectors.toList());
}

private void performMappings(Object recordObject, Object sourceObject, TargetsParameterConfig targetConfig) {
List<Object> targetValues = new ArrayList<>();
List<String> sourceKeys;
if (sourceObject instanceof List<?>) {
sourceKeys = (ArrayList<String>) sourceObject;
} else if (sourceObject instanceof String) {
sourceKeys = List.of((String) sourceObject);
} else {
String exceptionMsg = "source option configured incorrectly. source can only be a String or list of Strings";
throw new InvalidPluginConfigurationException(exceptionMsg);
private void performMappings(Object recordObject, List<String> sourceKeys, Object sourceObject, TargetsParameterConfig targetConfig) {
if (Objects.isNull(recordObject) ||
Objects.isNull(sourceObject) ||
Objects.isNull(targetConfig) ||
sourceKeys.isEmpty()) {
return;
}
String translateWhen = targetConfig.getTranslateWhen();
if(!isExpressionValid(translateWhen, recordObject)){
return;
}
List<Object> targetValues = new ArrayList<>();
for (String sourceKey : sourceKeys) {
String sourceValue = getSourceValue(recordObject, sourceKey);
if(sourceValue!=null){
Expand All @@ -123,6 +161,16 @@ private void performMappings(Object recordObject, Object sourceObject, TargetsPa
addTargetToRecords(sourceObject, targetValues, recordObject, targetConfig);
}

private boolean isExpressionValid(String translateWhen, Object recordObject){
Event recordEvent;
if (recordObject instanceof Map) {
recordEvent = eventBuilder.withData(recordObject).withEventType("event").build();
} else {
recordEvent = (Event)recordObject;
}
return (translateWhen == null) || expressionEvaluator.evaluateConditional(translateWhen, recordEvent);
}

private Optional<Object> getTargetValueForSource(final String sourceValue, TargetsParameterConfig targetConfig) {
Optional<Object> targetValue = Optional.empty();
targetValue = targetValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import static org.hamcrest.core.Is.is;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -21,13 +20,6 @@ void setup() throws NoSuchFieldException, IllegalAccessException{
setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", "sourceKey");
}

@Test
void test_get_iterate_on() throws NoSuchFieldException, IllegalAccessException{
assertNull(mappingsParameterConfig.getIterateOn());
setField(MappingsParameterConfig.class, mappingsParameterConfig, "iterateOn", "iteratorField");
assertThat(mappingsParameterConfig.getIterateOn(),is("iteratorField"));
}

@Test
void test_get_source() {
assertThat(mappingsParameterConfig.getSource(),is("sourceKey"));
Expand Down Expand Up @@ -65,4 +57,31 @@ void test_source_field_invalid_types() throws NoSuchFieldException, IllegalAcces
assertFalse(mappingsParameterConfig.isSourceFieldValid());
}

@Test
void test_valid_source_array() throws NoSuchFieldException, IllegalAccessException {
List<String> sourceList = List.of("sourceField1", "sourceField2");
setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", sourceList);
assertTrue(mappingsParameterConfig.isSourceFieldValid());
}

@Test
void test_invalid_source_array_not_string_type() throws NoSuchFieldException, IllegalAccessException {
List<Object> sourceList = List.of("sourceField1", 1);
setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", sourceList);
assertFalse(mappingsParameterConfig.isSourceFieldValid());
}

@Test
void test_valid_source_array_valid_common_path() throws NoSuchFieldException, IllegalAccessException {
List<String> sourceList = List.of("field1/field2/sourceField1", "field1/field2/sourceField2");
setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", sourceList);
assertTrue(mappingsParameterConfig.isSourceFieldValid());
}
@Test
void test_invalid_source_array_invalid_common_path() throws NoSuchFieldException, IllegalAccessException {
List<String> sourceList = List.of("field1/field2/sourceField1", "field1/sourceField2");
setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", sourceList);
assertFalse(mappingsParameterConfig.isSourceFieldValid());
}

}
Loading

0 comments on commit 385dc33

Please sign in to comment.