diff --git a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/JsonExtractor.java b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/JsonExtractor.java new file mode 100644 index 0000000000..6c1837c96c --- /dev/null +++ b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/JsonExtractor.java @@ -0,0 +1,122 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.translate; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * JsonExtractor class is a utility for handling JSON paths and extracting specific fields or objects from + * JSON structures represented as Java objects. It provides a way to work with nested JSON structures and + * retrieve the relevant data based on the provided paths. + */ +public class JsonExtractor { + + /** + * Default delimiter between the fields when providing the path + */ + private static final String DELIMITER = "/"; + + /** + * @param fullPath full path to the leaf field + * @return the first field from the full path, returns empty string "" if the path is empty + */ + public String getRootField(String fullPath) { + final List fieldsInPath = getFieldsInPath(fullPath); + return fieldsInPath.isEmpty() ? "" : fieldsInPath.get(0); + } + + /** + * @param fullPath full path to the leaf field + * @return the last field from the full path, returns empty string "" if the path is empty + */ + public String getLeafField(String fullPath) { + String strippedPath = getStrippedPath(fullPath); + final String[] fields = strippedPath.split(DELIMITER); + return fields.length==0 ? "" : fields[fields.length - 1].strip(); + } + + /** + * @param fullPath full path to the leaf field + * @return the path leading up to the lead field, returns empty string "" if there is no parent path + */ + public String getParentPath(String fullPath) { + String strippedPath = getStrippedPath(fullPath); + final String[] fields = strippedPath.split(DELIMITER); + if (fields.length <= 1) { + return ""; + } + return Arrays.stream(fields, 0, fields.length - 1).collect(Collectors.joining(DELIMITER)); + } + + /** + * @param path full path to the leaf field + * @param rootObject Java Object in which root field is located. Can be either a List or Map + * @return all the Java Objects that are associated with the provided path . + * Path : field1/field2/field3 gives the value of field3. + */ + public List getObjectFromPath(String path, Object rootObject) { + final List fieldsInPath = getFieldsInPath(path); + if (fieldsInPath.isEmpty()) { + return List.of(rootObject); + } + return getLeafObjects(fieldsInPath, 0, rootObject); + } + + /** + * @param path path from one field to another + * @return the list of fields in the provided path + */ + private List getFieldsInPath(String path) { + String strippedPath = getStrippedPath(path); + if (strippedPath.isEmpty()) { + return List.of(); + } + return new ArrayList<>(Arrays.asList(strippedPath.split(DELIMITER))); + } + + /** + * @param fieldsInPath list of fields in a path + * @param level current level inside the nested object with reference to the root level + * @param rootObject Java Object in which root field is located. Can be either a List or Map + * @return all the Java Objects that satisfy the fields hierarchy in fieldsInPath + */ + private List getLeafObjects(List fieldsInPath, int level, Object rootObject) { + if (Objects.isNull(rootObject)) { + return List.of(); + } + + if (rootObject instanceof List) { + return ((List) rootObject).stream() + .flatMap(arrayObject -> getLeafObjects(fieldsInPath, level, arrayObject).stream()) + .collect(Collectors.toList()); + } else if (rootObject instanceof Map) { + if (level >= fieldsInPath.size()) { + return List.of(rootObject); + } else { + String field = fieldsInPath.get(level); + Object outObj = ((Map) rootObject).get(field); + return getLeafObjects(fieldsInPath, level + 1, outObj); + } + } + return List.of(); + } + + /** + * @param path path from one field to another + * @return path stripped of whitespaces + */ + private String getStrippedPath(String path){ + checkNotNull(path, "path cannot be null"); + return path.strip(); + } +} diff --git a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfig.java b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfig.java index 064a673f3a..7146f62a4c 100644 --- a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfig.java +++ b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfig.java @@ -15,9 +15,6 @@ public class MappingsParameterConfig { @NotNull private Object source; - @JsonProperty("iterate_on") - private String iterateOn; - @JsonProperty("targets") @Valid private List targetsParameterConfigs = new ArrayList<>(); @@ -26,10 +23,6 @@ public Object getSource() { return source; } - public String getIterateOn() { - return iterateOn; - } - public List getTargetsParameterConfigs() { return targetsParameterConfigs; } @@ -44,7 +37,7 @@ public boolean isTargetsPresent(){ return Objects.nonNull(targetsParameterConfigs) && !targetsParameterConfigs.isEmpty(); } - @AssertTrue(message = "source field must be a string or list of strings") + @AssertTrue(message = "The \"source\" field should either be a string or a list of strings sharing the same parent path.") public boolean isSourceFieldValid() { if(Objects.isNull(source)){ return true; @@ -54,11 +47,29 @@ public boolean isSourceFieldValid() { } if (source instanceof List) { List sourceList = (List) source; - return sourceList.stream().allMatch(sourceItem -> sourceItem instanceof String); + if(sourceList.isEmpty()){ + return false; + } + return sourceList.stream().allMatch(sourceItem -> sourceItem instanceof String) + && commonRootPath(sourceList); } return false; } + public boolean commonRootPath(List sourceList){ + List sources = (List) sourceList; + + JsonExtractor jsonExtractor = new JsonExtractor(); + String firstSource = sources.get(0); + String parentPath = jsonExtractor.getParentPath(firstSource); + for (String source : sources) { + if (!jsonExtractor.getParentPath(source).equals(parentPath)) { + return false; + } + } + return true; + } + public void parseMappings(){ if(Objects.isNull(targetsParameterConfigs)){ return; diff --git a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessor.java b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessor.java index d627f7a77e..ee9c78bbbe 100644 --- a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessor.java +++ b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessor.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; @@ -39,6 +40,8 @@ public class TranslateProcessor extends AbstractProcessor, Record< private static final Logger LOG = LoggerFactory.getLogger(TranslateProcessor.class); private final ExpressionEvaluator expressionEvaluator; private final List mappingsConfig; + private final JacksonEvent.Builder eventBuilder= JacksonEvent.builder(); + private final JsonExtractor jsonExtractor = new JsonExtractor(); @DataPrepperPluginConstructor public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorConfig translateProcessorConfig, final ExpressionEvaluator expressionEvaluator) { @@ -58,21 +61,10 @@ public Collection> doExecute(Collection> records) { final Event recordEvent = record.getData(); for (MappingsParameterConfig mappingConfig : mappingsConfig) { try { - String iterateOn = mappingConfig.getIterateOn(); List targetsConfig = mappingConfig.getTargetsParameterConfigs(); for (TargetsParameterConfig targetConfig : targetsConfig) { - String translateWhen = targetConfig.getTranslateWhen(); Object sourceObject = mappingConfig.getSource(); - if (Objects.nonNull(translateWhen) && !expressionEvaluator.evaluateConditional(translateWhen, recordEvent)) { - continue; - } - if (Objects.nonNull(iterateOn)) { - List> objectsToIterate = recordEvent.get(iterateOn, List.class); - objectsToIterate.forEach(recordObject -> performMappings(recordObject, sourceObject, targetConfig)); - recordEvent.put(iterateOn, objectsToIterate); - } else { - performMappings(recordEvent, sourceObject, targetConfig); - } + translateSource(sourceObject, recordEvent, targetConfig); } } catch (Exception ex) { LOG.error(EVENT, "Error mapping the source [{}] of entry [{}]", mappingConfig.getSource(), @@ -83,12 +75,57 @@ public Collection> doExecute(Collection> records) { return records; } + private List getSourceKeys(Object sourceObject){ + List sourceKeys; + if (sourceObject instanceof List) { + sourceKeys = (ArrayList) sourceObject; + } else if (sourceObject instanceof String) { + sourceKeys = List.of((String) sourceObject); + } else { + String exceptionMsg = "source option configured incorrectly. source can only be a String or list of Strings"; + throw new InvalidPluginConfigurationException(exceptionMsg); + } + return sourceKeys; + } + + private void translateSource(Object sourceObject, Event recordEvent, TargetsParameterConfig targetConfig) { + Map recordObject = recordEvent.toMap(); + List sourceKeysPaths = getSourceKeys(sourceObject); + if(Objects.isNull(recordObject) || sourceKeysPaths.isEmpty()){ + return; + } + + List sourceKeys = new ArrayList<>(); + for(String sourceKeyPath: sourceKeysPaths){ + sourceKeys.add(jsonExtractor.getLeafField(sourceKeyPath)); + } + + String commonPath = jsonExtractor.getParentPath(sourceKeysPaths.get(0)); + if(commonPath.isEmpty()) { + performMappings(recordEvent, sourceKeys, sourceObject, targetConfig); + return; + } + + String rootField = jsonExtractor.getRootField(commonPath); + if(!recordObject.containsKey(rootField)){ + return; + } + + List targetObjects = jsonExtractor.getObjectFromPath(commonPath, recordObject); + if(!targetObjects.isEmpty()) { + targetObjects.forEach(targetObj -> performMappings(targetObj, sourceKeys, sourceObject, targetConfig)); + recordEvent.put(rootField, recordObject.get(rootField)); + } + } + private String getSourceValue(Object recordObject, String sourceKey) { + Optional sourceValue; if (recordObject instanceof Map) { - return (String) ((Map) recordObject).get(sourceKey); + sourceValue = Optional.ofNullable(((Map) recordObject).get(sourceKey)); } else { - return ((Event) recordObject).get(sourceKey, String.class); + sourceValue = Optional.ofNullable(((Event) recordObject).get(sourceKey, String.class)); } + return sourceValue.map(Object::toString).orElse(null); } private Object getTargetValue(Object sourceObject, List targetValues, TargetsParameterConfig targetConfig) { @@ -102,17 +139,18 @@ private Object getTargetValue(Object sourceObject, List targetValues, Ta .collect(Collectors.toList()); } - private void performMappings(Object recordObject, Object sourceObject, TargetsParameterConfig targetConfig) { - List targetValues = new ArrayList<>(); - List sourceKeys; - if (sourceObject instanceof List) { - sourceKeys = (ArrayList) sourceObject; - } else if (sourceObject instanceof String) { - sourceKeys = List.of((String) sourceObject); - } else { - String exceptionMsg = "source option configured incorrectly. source can only be a String or list of Strings"; - throw new InvalidPluginConfigurationException(exceptionMsg); + private void performMappings(Object recordObject, List sourceKeys, Object sourceObject, TargetsParameterConfig targetConfig) { + if (Objects.isNull(recordObject) || + Objects.isNull(sourceObject) || + Objects.isNull(targetConfig) || + sourceKeys.isEmpty()) { + return; } + String translateWhen = targetConfig.getTranslateWhen(); + if(!isExpressionValid(translateWhen, recordObject)){ + return; + } + List targetValues = new ArrayList<>(); for (String sourceKey : sourceKeys) { String sourceValue = getSourceValue(recordObject, sourceKey); if(sourceValue!=null){ @@ -123,6 +161,16 @@ private void performMappings(Object recordObject, Object sourceObject, TargetsPa addTargetToRecords(sourceObject, targetValues, recordObject, targetConfig); } + private boolean isExpressionValid(String translateWhen, Object recordObject){ + Event recordEvent; + if (recordObject instanceof Map) { + recordEvent = eventBuilder.withData(recordObject).withEventType("event").build(); + } else { + recordEvent = (Event)recordObject; + } + return (translateWhen == null) || expressionEvaluator.evaluateConditional(translateWhen, recordEvent); + } + private Optional getTargetValueForSource(final String sourceValue, TargetsParameterConfig targetConfig) { Optional targetValue = Optional.empty(); targetValue = targetValue diff --git a/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfigTest.java b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfigTest.java index 86d30602e1..04728a3531 100644 --- a/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfigTest.java +++ b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfigTest.java @@ -5,7 +5,6 @@ import static org.hamcrest.core.Is.is; import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.hamcrest.MatcherAssert.assertThat; @@ -21,13 +20,6 @@ void setup() throws NoSuchFieldException, IllegalAccessException{ setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", "sourceKey"); } - @Test - void test_get_iterate_on() throws NoSuchFieldException, IllegalAccessException{ - assertNull(mappingsParameterConfig.getIterateOn()); - setField(MappingsParameterConfig.class, mappingsParameterConfig, "iterateOn", "iteratorField"); - assertThat(mappingsParameterConfig.getIterateOn(),is("iteratorField")); - } - @Test void test_get_source() { assertThat(mappingsParameterConfig.getSource(),is("sourceKey")); @@ -65,4 +57,31 @@ void test_source_field_invalid_types() throws NoSuchFieldException, IllegalAcces assertFalse(mappingsParameterConfig.isSourceFieldValid()); } + @Test + void test_valid_source_array() throws NoSuchFieldException, IllegalAccessException { + List sourceList = List.of("sourceField1", "sourceField2"); + setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", sourceList); + assertTrue(mappingsParameterConfig.isSourceFieldValid()); + } + + @Test + void test_invalid_source_array_not_string_type() throws NoSuchFieldException, IllegalAccessException { + List sourceList = List.of("sourceField1", 1); + setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", sourceList); + assertFalse(mappingsParameterConfig.isSourceFieldValid()); + } + + @Test + void test_valid_source_array_valid_common_path() throws NoSuchFieldException, IllegalAccessException { + List sourceList = List.of("field1/field2/sourceField1", "field1/field2/sourceField2"); + setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", sourceList); + assertTrue(mappingsParameterConfig.isSourceFieldValid()); + } + @Test + void test_invalid_source_array_invalid_common_path() throws NoSuchFieldException, IllegalAccessException { + List sourceList = List.of("field1/field2/sourceField1", "field1/sourceField2"); + setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", sourceList); + assertFalse(mappingsParameterConfig.isSourceFieldValid()); + } + } \ No newline at end of file diff --git a/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorTest.java b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorTest.java index fbefe75eeb..e0610771a0 100644 --- a/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorTest.java +++ b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorTest.java @@ -428,7 +428,7 @@ void test_nested_records_with_default_value() { when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries( createMapping("key1", "mappedValue1"), createMapping("key2", "mappedValue2"))); - when(mappingsParameterConfig.getIterateOn()).thenReturn("collection"); + when(mappingsParameterConfig.getSource()).thenReturn("collection/sourceField"); targetsParameterConfig = new TargetsParameterConfig(null, "targetField", mockRegexConfig, null, "No Match", null); when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); @@ -454,7 +454,7 @@ void test_nested_records_without_default_value() { when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries( createMapping("key1", "mappedValue1"), createMapping("key2", "mappedValue2"))); - when(mappingsParameterConfig.getIterateOn()).thenReturn("collection"); + when(mappingsParameterConfig.getSource()).thenReturn("collection/sourceField"); targetsParameterConfig = new TargetsParameterConfig(null, "targetField", mockRegexConfig, null, null, null); when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); @@ -477,7 +477,83 @@ void test_nested_records_no_match() { Map.of("sourceField", "key3")); when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries(createMapping("key4", "mappedValue1"))); - when(mappingsParameterConfig.getIterateOn()).thenReturn("collection"); + when(mappingsParameterConfig.getSource()).thenReturn("collection/sourceField"); + targetsParameterConfig = new TargetsParameterConfig(null, "targetField", mockRegexConfig, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); + + final TranslateProcessor processor = createObjectUnderTest(); + final Record record = buildRecordWithEvent(testJson); + final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(translatedRecords.get(0).getData().get("collection", ArrayList.class), is(outputJson)); + } + + @Test + void test_nested_multiple_levels() { + final Map testJson = Map.of("collection", List.of( + Map.of("sourceField1", List.of(Map.of("sourceField2", "key1"))))); + final List> outputJson = List.of( + Map.of("sourceField1", List.of(Map.of("sourceField2", "key1", "targetField","mappedValue1")))); + + when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries(createMapping("key1", "mappedValue1"))); + when(mappingsParameterConfig.getSource()).thenReturn("collection/sourceField1/sourceField2"); + targetsParameterConfig = new TargetsParameterConfig(null, "targetField", mockRegexConfig, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); + + final TranslateProcessor processor = createObjectUnderTest(); + final Record record = buildRecordWithEvent(testJson); + final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(translatedRecords.get(0).getData().get("collection", ArrayList.class), is(outputJson)); + } + + @Test + void test_no_path_found_with_wrong_field() { + final Map testJson = Map.of("collection", List.of( + Map.of("sourceField1", List.of(Map.of("sourceField2", "key1"))))); + final List> outputJson = List.of( + Map.of("sourceField1", List.of(Map.of("sourceField2", "key1")))); + + when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries(createMapping("key1", "mappedValue1"))); + when(mappingsParameterConfig.getSource()).thenReturn("collection/noSource/sourceField2"); + targetsParameterConfig = new TargetsParameterConfig(null, "targetField", mockRegexConfig, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); + + final TranslateProcessor processor = createObjectUnderTest(); + final Record record = buildRecordWithEvent(testJson); + final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(translatedRecords.get(0).getData().get("collection", ArrayList.class), is(outputJson)); + } + + @Test + void test_no_path_found_with_no_list() { + final Map testJson = Map.of("collection", List.of( + Map.of("sourceField1", "key1","sourceField2", "key1"))); + final List> outputJson = List.of( + Map.of("sourceField1", "key1","sourceField2", "key1")); + + when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries(createMapping("key1", "mappedValue1"))); + when(mappingsParameterConfig.getSource()).thenReturn("collection/sourceField1/sourceField2"); + targetsParameterConfig = new TargetsParameterConfig(null, "targetField", mockRegexConfig, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); + + final TranslateProcessor processor = createObjectUnderTest(); + final Record record = buildRecordWithEvent(testJson); + final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(translatedRecords.get(0).getData().get("collection", ArrayList.class), is(outputJson)); + } + + @Test + void test_path_with_whitespaces() { + final Map testJson = Map.of("collection", List.of( + Map.of("sourceField1", List.of(Map.of("sourceField2", "key1"))))); + final List> outputJson = List.of( + Map.of("sourceField1", List.of(Map.of("sourceField2", "key1", "targetField","mappedValue1")))); + + when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries(createMapping("key1", "mappedValue1"))); + when(mappingsParameterConfig.getSource()).thenReturn(" collection/sourceField1/sourceField2 "); targetsParameterConfig = new TargetsParameterConfig(null, "targetField", mockRegexConfig, null, null, null); when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); @@ -539,6 +615,7 @@ void test_target_type_double() { assertTrue(translatedRecords.get(0).getData().containsKey("targetField")); assertThat(translatedRecords.get(0).getData().get("targetField", Double.class), is(20.3)); } + @Nested class FilePathTests { private File testMappingsFile;