diff --git a/data-prepper-plugins/translate-processor/build.gradle b/data-prepper-plugins/translate-processor/build.gradle index 6b6526fda4..5139ebbb07 100644 --- a/data-prepper-plugins/translate-processor/build.gradle +++ b/data-prepper-plugins/translate-processor/build.gradle @@ -13,6 +13,7 @@ dependencies { implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.0' implementation 'io.micrometer:micrometer-core' implementation project(path: ':data-prepper-api') + implementation project(path: ':data-prepper-plugins:mutate-event-processors') testImplementation project(':data-prepper-plugins:log-generator-source') testImplementation project(':data-prepper-test-common') implementation 'org.apache.commons:commons-lang3:3.12.0' diff --git a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/RegexParameterConfiguration.java b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/RegexParameterConfiguration.java index a370648c23..a7e3d494da 100644 --- a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/RegexParameterConfiguration.java +++ b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/RegexParameterConfiguration.java @@ -11,12 +11,12 @@ public class RegexParameterConfiguration { final boolean DEFAULT_EXACT = true; @NotNull @JsonProperty("patterns") - private Map patterns; + private Map patterns; @JsonProperty("exact") private Boolean exact = DEFAULT_EXACT; - public Map getPatterns() { + public Map getPatterns() { return patterns; } diff --git a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessor.java b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessor.java index d99ce82897..476bfa807b 100644 --- a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessor.java +++ b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessor.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.typeconverter.TypeConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +31,7 @@ import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; @@ -40,15 +42,17 @@ public class TranslateProcessor extends AbstractProcessor, Record< private static final Logger LOG = LoggerFactory.getLogger(TranslateProcessor.class); private final ExpressionEvaluator expressionEvaluator; private final TranslateProcessorConfig translateProcessorConfig; - private final LinkedHashMap, String> rangeMappings; - private final Map individualMappings; - private final Map compiledPatterns; + private final LinkedHashMap, Object> rangeMappings; + private final Map individualMappings; + private final Map compiledPatterns; + private final TypeConverter converter; @DataPrepperPluginConstructor public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorConfig translateProcessorConfig, final ExpressionEvaluator expressionEvaluator) { super(pluginMetrics); this.translateProcessorConfig = translateProcessorConfig; this.expressionEvaluator = expressionEvaluator; + this.converter = translateProcessorConfig.getTargetType().getTargetConverter(); individualMappings = new HashMap<>(); rangeMappings = new LinkedHashMap<>(); compiledPatterns = new HashMap<>(); @@ -62,22 +66,22 @@ public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorC checkOverlappingKeys(); } - private void compilePatterns(Map mappings) { + private void compilePatterns(Map mappings) { for (String pattern : mappings.keySet()) { Pattern compiledPattern = Pattern.compile(pattern); compiledPatterns.put(compiledPattern, mappings.get(pattern)); } } - private void processMapField(Map map) { + private void processMapField(Map map) { if (Objects.nonNull(map)) { - for (Map.Entry mapEntry : map.entrySet()) { + for (Map.Entry mapEntry : map.entrySet()) { parseIndividualKeys(mapEntry); } } } - private void parseIndividualKeys(Map.Entry mapEntry){ + private void parseIndividualKeys(Map.Entry mapEntry){ String[] commaSeparatedKeys = mapEntry.getKey().split(","); for(String individualKey : commaSeparatedKeys){ if(individualKey.contains("-")){ @@ -88,7 +92,7 @@ private void parseIndividualKeys(Map.Entry mapEntry){ } } - private void addRangeMapping(Map.Entry mapEntry){ + private void addRangeMapping(Map.Entry mapEntry){ String[] rangeKeys = mapEntry.getKey().split("-"); if(rangeKeys.length!=2 || !StringUtils.isNumericSpace(rangeKeys[0]) || !StringUtils.isNumericSpace(rangeKeys[1])){ addIndividualMapping(mapEntry.getKey(), mapEntry.getValue()); @@ -105,7 +109,7 @@ private void addRangeMapping(Map.Entry mapEntry){ } } - private void addIndividualMapping(final String key, final String value){ + private void addIndividualMapping(final String key, final Object value){ if(individualMappings.containsKey(key)){ String exceptionMsg = "map option contains duplicate entries of "+key; throw new InvalidPluginConfigurationException(exceptionMsg); @@ -174,15 +178,15 @@ private String getSourceValue(Object recordObject, String sourceKey) { } } - private Object getTargetValue(Object sourceObject, List targetValues){ - if(sourceObject instanceof String){ - return targetValues.get(0); + private Object getTargetValue(Object sourceObject, List targetValues){ + if(sourceObject instanceof String) { + return converter.convert(targetValues.get(0)); } - return targetValues; + return targetValues.stream().map(converter::convert).collect(Collectors.toList()); } private void performMappings(Object recordObject) { - List targetValues = new ArrayList<>(); + List targetValues = new ArrayList<>(); Object sourceObject = translateProcessorConfig.getSource(); List sourceKeys; if (sourceObject instanceof List) { @@ -195,14 +199,14 @@ private void performMappings(Object recordObject) { } for (String sourceKey : sourceKeys) { String sourceValue = getSourceValue(recordObject, sourceKey); - Optional targetValue = getTargetValueForSource(sourceValue); + Optional targetValue = getTargetValueForSource(sourceValue); targetValue.ifPresent(targetValues::add); } addTargetToRecords(sourceObject, targetValues, recordObject); } - private Optional getTargetValueForSource(final String sourceValue) { - Optional targetValue = Optional.empty(); + private Optional getTargetValueForSource(final String sourceValue) { + Optional targetValue = Optional.empty(); targetValue = targetValue .or(() -> matchesIndividualEntry(sourceValue)) .or(() -> matchesRangeEntry(sourceValue)) @@ -211,19 +215,19 @@ private Optional getTargetValueForSource(final String sourceValue) { return targetValue; } - private Optional matchesIndividualEntry(final String sourceValue) { + private Optional matchesIndividualEntry(final String sourceValue) { if (individualMappings.containsKey(sourceValue)) { return Optional.of(individualMappings.get(sourceValue)); } return Optional.empty(); } - private Optional matchesRangeEntry(final String sourceValue) { + private Optional matchesRangeEntry(final String sourceValue) { if (!NumberUtils.isParsable(sourceValue)) { return Optional.empty(); } Float floatKey = Float.parseFloat(sourceValue); - for (Map.Entry, String> rangeEntry : rangeMappings.entrySet()) { + for (Map.Entry, Object> rangeEntry : rangeMappings.entrySet()) { Range range = rangeEntry.getKey(); if (range.contains(floatKey)) { return Optional.of(rangeEntry.getValue()); @@ -232,7 +236,7 @@ private Optional matchesRangeEntry(final String sourceValue) { return Optional.empty(); } - private Optional matchesPatternEntry(final String sourceValue) { + private Optional matchesPatternEntry(final String sourceValue) { if (compiledPatterns.isEmpty()) { return Optional.empty(); } @@ -246,7 +250,7 @@ private Optional matchesPatternEntry(final String sourceValue) { return Optional.empty(); } - private void addTargetToRecords(Object sourceObject, List targetValues, Object recordObject) { + private void addTargetToRecords(Object sourceObject, List targetValues, Object recordObject) { if (targetValues.isEmpty()) { return; } diff --git a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfig.java b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfig.java index ad46128335..845442bc40 100644 --- a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfig.java +++ b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfig.java @@ -10,14 +10,18 @@ import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType; +import org.opensearch.dataprepper.typeconverter.TypeConverter; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Stream; public class TranslateProcessorConfig { + @JsonProperty("source") @NotNull private Object source; @@ -28,7 +32,7 @@ public class TranslateProcessorConfig { private String target; @JsonProperty("map") - private Map map; + private Map map; @JsonProperty("file_path") private String filePath; @@ -45,12 +49,15 @@ public class TranslateProcessorConfig { @JsonProperty("regex") private RegexParameterConfiguration regexParameterConfiguration; + @JsonProperty("target_type") + private TargetType targetType = TargetType.STRING; + public Object getSource() { return source; } public String getTarget() { return target; } - public Map getMap() { return map; } + public Map getMap() { return map; } public String getDefaultValue() { return defaultValue; } @@ -60,6 +67,8 @@ public class TranslateProcessorConfig { public String getIterateOn() { return iterateOn; } + public TargetType getTargetType() { return targetType; } + public RegexParameterConfiguration getRegexParameterConfiguration(){ return regexParameterConfiguration; } @@ -85,4 +94,30 @@ public boolean isPatternPresent(){ return regexParameterConfiguration == null || regexParameterConfiguration.getPatterns() != null; } + @AssertTrue(message = "The mapped values do not match the target type provided") + public boolean isMapTypeValid() { + return map.keySet().stream().allMatch(key -> checkTargetValueType(map.get(key))); + } + + @AssertTrue(message = "The pattern values do not match the target type provided") + public boolean isPatternTypeValid() { + if (Objects.isNull(regexParameterConfiguration) || Objects.isNull(regexParameterConfiguration.getPatterns())) { + return true; + } + Map patterns = regexParameterConfiguration.getPatterns(); + return patterns.keySet().stream().allMatch(key -> checkTargetValueType(patterns.get(key))); + } + + private boolean checkTargetValueType(Object val) throws NumberFormatException { + if (Objects.isNull(targetType)) { + return true; + } + try { + final TypeConverter converter = targetType.getTargetConverter(); + converter.convert(val); + } catch (Exception ex) { + return false; + } + return true; + } } diff --git a/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfigTest.java b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfigTest.java index 998f93db0c..6af0f325eb 100644 --- a/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfigTest.java +++ b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfigTest.java @@ -2,6 +2,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType; import java.util.Collections; import java.util.List; @@ -97,4 +98,15 @@ void test_get_iterate_on() throws NoSuchFieldException, IllegalAccessException{ setField(TranslateProcessorConfig.class, translateProcessorConfig, "iterateOn", "iteratorField"); assertThat(translateProcessorConfig.getIterateOn(),is("iteratorField")); } + + @Test + void test_target_type_default(){ + assertThat(translateProcessorConfig.getTargetType(), is(TargetType.STRING)); + } + + @Test + void test_get_target_type() throws NoSuchFieldException, IllegalAccessException{ + setField(TranslateProcessorConfig.class, translateProcessorConfig, "targetType", TargetType.INTEGER); + assertThat(translateProcessorConfig.getTargetType(), is(TargetType.INTEGER)); + } } \ No newline at end of file diff --git a/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorTest.java b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorTest.java index 3add7708d3..394cb11ad8 100644 --- a/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorTest.java +++ b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorTest.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType; import java.util.AbstractMap; import java.util.ArrayList; @@ -47,6 +48,7 @@ class TranslateProcessorTest { void setup() { lenient().when(mockConfig.getSource()).thenReturn("sourceField"); lenient().when(mockConfig.getTarget()).thenReturn("targetField"); + lenient().when(mockConfig.getTargetType()).thenReturn(TargetType.STRING); lenient().when(mockRegexConfig.getExact()).thenReturn(mockRegexConfig.DEFAULT_EXACT); } @@ -412,6 +414,52 @@ void test_nested_records_no_match() { assertThat(translatedRecords.get(0).getData().get("collection", ArrayList.class), is(outputJson)); } + @Test + void test_target_type_default(){ + when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "200"))); + final TranslateProcessor processor = createObjectUnderTest(); + final Record record = getEvent("key1"); + final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertTrue(translatedRecords.get(0).getData().containsKey("targetField")); + assertThat(translatedRecords.get(0).getData().get("targetField", String.class), is("200")); + } + + @Test + void test_target_type_integer(){ + when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "200"))); + when(mockConfig.getTargetType()).thenReturn(TargetType.INTEGER); + final TranslateProcessor processor = createObjectUnderTest(); + final Record record = getEvent("key1"); + final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertTrue(translatedRecords.get(0).getData().containsKey("targetField")); + assertThat(translatedRecords.get(0).getData().get("targetField", Integer.class), is(200)); + } + + @Test + void test_target_type_boolean(){ + when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "false"))); + when(mockConfig.getTargetType()).thenReturn(TargetType.BOOLEAN); + final TranslateProcessor processor = createObjectUnderTest(); + final Record record = getEvent("key1"); + final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertTrue(translatedRecords.get(0).getData().containsKey("targetField")); + assertThat(translatedRecords.get(0).getData().get("targetField", Boolean.class), is(false)); + } + + @Test + void test_target_type_double(){ + when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "20.3"))); + when(mockConfig.getTargetType()).thenReturn(TargetType.DOUBLE); + final TranslateProcessor processor = createObjectUnderTest(); + final Record record = getEvent("key1"); + final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertTrue(translatedRecords.get(0).getData().containsKey("targetField")); + assertThat(translatedRecords.get(0).getData().get("targetField", Double.class), is(20.3)); + } private TranslateProcessor createObjectUnderTest() { @@ -442,8 +490,8 @@ private Map.Entry createMapping(String key, String value) { return new AbstractMap.SimpleEntry<>(key, value); } - private Map createMapEntries(Map.Entry... mappings) { - final Map finalMap = new HashMap<>(); + private Map createMapEntries(Map.Entry... mappings) { + final Map finalMap = new HashMap<>(); for (Map.Entry mapping : mappings) { finalMap.put(mapping.getKey(), mapping.getValue()); }