Skip to content

Commit

Permalink
Translate Plugin - Target Type implementation (opensearch-project#2979)
Browse files Browse the repository at this point in the history
* Translate Plugin - Target Type implementation

Signed-off-by: Vishal Boinapalli <[email protected]>

* addressed review comments

Signed-off-by: Vishal Boinapalli <[email protected]>

---------

Signed-off-by: Vishal Boinapalli <[email protected]>
  • Loading branch information
vishalboin committed Jul 6, 2023
1 parent df19fc6 commit cc5ed41
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 28 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/translate-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.0'
implementation 'io.micrometer:micrometer-core'
implementation project(path: ':data-prepper-api')
implementation project(path: ':data-prepper-plugins:mutate-event-processors')
testImplementation project(':data-prepper-plugins:log-generator-source')
testImplementation project(':data-prepper-test-common')
implementation 'org.apache.commons:commons-lang3:3.12.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ public class RegexParameterConfiguration {
final boolean DEFAULT_EXACT = true;
@NotNull
@JsonProperty("patterns")
private Map<String, String> patterns;
private Map<String, Object> patterns;

@JsonProperty("exact")
private Boolean exact = DEFAULT_EXACT;

public Map<String, String> getPatterns() {
public Map<String, Object> getPatterns() {
return patterns;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.typeconverter.TypeConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,6 +31,7 @@
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

Expand All @@ -40,15 +42,17 @@ public class TranslateProcessor extends AbstractProcessor<Record<Event>, Record<
private static final Logger LOG = LoggerFactory.getLogger(TranslateProcessor.class);
private final ExpressionEvaluator expressionEvaluator;
private final TranslateProcessorConfig translateProcessorConfig;
private final LinkedHashMap<Range<Float>, String> rangeMappings;
private final Map<String, String> individualMappings;
private final Map<Pattern, String> compiledPatterns;
private final LinkedHashMap<Range<Float>, Object> rangeMappings;
private final Map<String, Object> individualMappings;
private final Map<Pattern, Object> compiledPatterns;
private final TypeConverter converter;

@DataPrepperPluginConstructor
public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorConfig translateProcessorConfig, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.translateProcessorConfig = translateProcessorConfig;
this.expressionEvaluator = expressionEvaluator;
this.converter = translateProcessorConfig.getTargetType().getTargetConverter();
individualMappings = new HashMap<>();
rangeMappings = new LinkedHashMap<>();
compiledPatterns = new HashMap<>();
Expand All @@ -62,22 +66,22 @@ public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorC
checkOverlappingKeys();
}

private void compilePatterns(Map<String, String> mappings) {
private void compilePatterns(Map<String, Object> mappings) {
for (String pattern : mappings.keySet()) {
Pattern compiledPattern = Pattern.compile(pattern);
compiledPatterns.put(compiledPattern, mappings.get(pattern));
}
}

private void processMapField(Map<String, String> map) {
private void processMapField(Map<String, Object> map) {
if (Objects.nonNull(map)) {
for (Map.Entry<String, String> mapEntry : map.entrySet()) {
for (Map.Entry<String, Object> mapEntry : map.entrySet()) {
parseIndividualKeys(mapEntry);
}
}
}

private void parseIndividualKeys(Map.Entry<String, String> mapEntry){
private void parseIndividualKeys(Map.Entry<String, Object> mapEntry){
String[] commaSeparatedKeys = mapEntry.getKey().split(",");
for(String individualKey : commaSeparatedKeys){
if(individualKey.contains("-")){
Expand All @@ -88,7 +92,7 @@ private void parseIndividualKeys(Map.Entry<String, String> mapEntry){
}
}

private void addRangeMapping(Map.Entry<String, String> mapEntry){
private void addRangeMapping(Map.Entry<String, Object> mapEntry){
String[] rangeKeys = mapEntry.getKey().split("-");
if(rangeKeys.length!=2 || !StringUtils.isNumericSpace(rangeKeys[0]) || !StringUtils.isNumericSpace(rangeKeys[1])){
addIndividualMapping(mapEntry.getKey(), mapEntry.getValue());
Expand All @@ -105,7 +109,7 @@ private void addRangeMapping(Map.Entry<String, String> mapEntry){
}
}

private void addIndividualMapping(final String key, final String value){
private void addIndividualMapping(final String key, final Object value){
if(individualMappings.containsKey(key)){
String exceptionMsg = "map option contains duplicate entries of "+key;
throw new InvalidPluginConfigurationException(exceptionMsg);
Expand Down Expand Up @@ -174,15 +178,15 @@ private String getSourceValue(Object recordObject, String sourceKey) {
}
}

private Object getTargetValue(Object sourceObject, List<String> targetValues){
if(sourceObject instanceof String){
return targetValues.get(0);
private Object getTargetValue(Object sourceObject, List<Object> targetValues){
if(sourceObject instanceof String) {
return converter.convert(targetValues.get(0));
}
return targetValues;
return targetValues.stream().map(converter::convert).collect(Collectors.toList());
}

private void performMappings(Object recordObject) {
List<String> targetValues = new ArrayList<>();
List<Object> targetValues = new ArrayList<>();
Object sourceObject = translateProcessorConfig.getSource();
List<String> sourceKeys;
if (sourceObject instanceof List<?>) {
Expand All @@ -195,14 +199,14 @@ private void performMappings(Object recordObject) {
}
for (String sourceKey : sourceKeys) {
String sourceValue = getSourceValue(recordObject, sourceKey);
Optional<String> targetValue = getTargetValueForSource(sourceValue);
Optional<Object> targetValue = getTargetValueForSource(sourceValue);
targetValue.ifPresent(targetValues::add);
}
addTargetToRecords(sourceObject, targetValues, recordObject);
}

private Optional<String> getTargetValueForSource(final String sourceValue) {
Optional<String> targetValue = Optional.empty();
private Optional<Object> getTargetValueForSource(final String sourceValue) {
Optional<Object> targetValue = Optional.empty();
targetValue = targetValue
.or(() -> matchesIndividualEntry(sourceValue))
.or(() -> matchesRangeEntry(sourceValue))
Expand All @@ -211,19 +215,19 @@ private Optional<String> getTargetValueForSource(final String sourceValue) {
return targetValue;
}

private Optional<String> matchesIndividualEntry(final String sourceValue) {
private Optional<Object> matchesIndividualEntry(final String sourceValue) {
if (individualMappings.containsKey(sourceValue)) {
return Optional.of(individualMappings.get(sourceValue));
}
return Optional.empty();
}

private Optional<String> matchesRangeEntry(final String sourceValue) {
private Optional<Object> matchesRangeEntry(final String sourceValue) {
if (!NumberUtils.isParsable(sourceValue)) {
return Optional.empty();
}
Float floatKey = Float.parseFloat(sourceValue);
for (Map.Entry<Range<Float>, String> rangeEntry : rangeMappings.entrySet()) {
for (Map.Entry<Range<Float>, Object> rangeEntry : rangeMappings.entrySet()) {
Range<Float> range = rangeEntry.getKey();
if (range.contains(floatKey)) {
return Optional.of(rangeEntry.getValue());
Expand All @@ -232,7 +236,7 @@ private Optional<String> matchesRangeEntry(final String sourceValue) {
return Optional.empty();
}

private Optional<String> matchesPatternEntry(final String sourceValue) {
private Optional<Object> matchesPatternEntry(final String sourceValue) {
if (compiledPatterns.isEmpty()) {
return Optional.empty();
}
Expand All @@ -246,7 +250,7 @@ private Optional<String> matchesPatternEntry(final String sourceValue) {
return Optional.empty();
}

private void addTargetToRecords(Object sourceObject, List<String> targetValues, Object recordObject) {
private void addTargetToRecords(Object sourceObject, List<Object> targetValues, Object recordObject) {
if (targetValues.isEmpty()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType;
import org.opensearch.dataprepper.typeconverter.TypeConverter;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;


public class TranslateProcessorConfig {


@JsonProperty("source")
@NotNull
private Object source;
Expand All @@ -28,7 +32,7 @@ public class TranslateProcessorConfig {
private String target;

@JsonProperty("map")
private Map<String, String> map;
private Map<String, Object> map;

@JsonProperty("file_path")
private String filePath;
Expand All @@ -45,12 +49,15 @@ public class TranslateProcessorConfig {
@JsonProperty("regex")
private RegexParameterConfiguration regexParameterConfiguration;

@JsonProperty("target_type")
private TargetType targetType = TargetType.STRING;


public Object getSource() { return source; }

public String getTarget() { return target; }

public Map<String, String> getMap() { return map; }
public Map<String, Object> getMap() { return map; }

public String getDefaultValue() { return defaultValue; }

Expand All @@ -60,6 +67,8 @@ public class TranslateProcessorConfig {

public String getIterateOn() { return iterateOn; }

public TargetType getTargetType() { return targetType; }

public RegexParameterConfiguration getRegexParameterConfiguration(){ return regexParameterConfiguration; }


Expand All @@ -85,4 +94,30 @@ public boolean isPatternPresent(){
return regexParameterConfiguration == null || regexParameterConfiguration.getPatterns() != null;
}

@AssertTrue(message = "The mapped values do not match the target type provided")
public boolean isMapTypeValid() {
return map.keySet().stream().allMatch(key -> checkTargetValueType(map.get(key)));
}

@AssertTrue(message = "The pattern values do not match the target type provided")
public boolean isPatternTypeValid() {
if (Objects.isNull(regexParameterConfiguration) || Objects.isNull(regexParameterConfiguration.getPatterns())) {
return true;
}
Map<String, Object> patterns = regexParameterConfiguration.getPatterns();
return patterns.keySet().stream().allMatch(key -> checkTargetValueType(patterns.get(key)));
}

private boolean checkTargetValueType(Object val) throws NumberFormatException {
if (Objects.isNull(targetType)) {
return true;
}
try {
final TypeConverter converter = targetType.getTargetConverter();
converter.convert(val);
} catch (Exception ex) {
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -97,4 +98,15 @@ void test_get_iterate_on() throws NoSuchFieldException, IllegalAccessException{
setField(TranslateProcessorConfig.class, translateProcessorConfig, "iterateOn", "iteratorField");
assertThat(translateProcessorConfig.getIterateOn(),is("iteratorField"));
}

@Test
void test_target_type_default(){
assertThat(translateProcessorConfig.getTargetType(), is(TargetType.STRING));
}

@Test
void test_get_target_type() throws NoSuchFieldException, IllegalAccessException{
setField(TranslateProcessorConfig.class, translateProcessorConfig, "targetType", TargetType.INTEGER);
assertThat(translateProcessorConfig.getTargetType(), is(TargetType.INTEGER));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType;

import java.util.AbstractMap;
import java.util.ArrayList;
Expand Down Expand Up @@ -47,6 +48,7 @@ class TranslateProcessorTest {
void setup() {
lenient().when(mockConfig.getSource()).thenReturn("sourceField");
lenient().when(mockConfig.getTarget()).thenReturn("targetField");
lenient().when(mockConfig.getTargetType()).thenReturn(TargetType.STRING);
lenient().when(mockRegexConfig.getExact()).thenReturn(mockRegexConfig.DEFAULT_EXACT);
}

Expand Down Expand Up @@ -412,6 +414,52 @@ void test_nested_records_no_match() {
assertThat(translatedRecords.get(0).getData().get("collection", ArrayList.class), is(outputJson));
}

@Test
void test_target_type_default(){
when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "200")));
final TranslateProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("key1");
final List<Record<Event>> translatedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertTrue(translatedRecords.get(0).getData().containsKey("targetField"));
assertThat(translatedRecords.get(0).getData().get("targetField", String.class), is("200"));
}

@Test
void test_target_type_integer(){
when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "200")));
when(mockConfig.getTargetType()).thenReturn(TargetType.INTEGER);
final TranslateProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("key1");
final List<Record<Event>> translatedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertTrue(translatedRecords.get(0).getData().containsKey("targetField"));
assertThat(translatedRecords.get(0).getData().get("targetField", Integer.class), is(200));
}

@Test
void test_target_type_boolean(){
when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "false")));
when(mockConfig.getTargetType()).thenReturn(TargetType.BOOLEAN);
final TranslateProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("key1");
final List<Record<Event>> translatedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertTrue(translatedRecords.get(0).getData().containsKey("targetField"));
assertThat(translatedRecords.get(0).getData().get("targetField", Boolean.class), is(false));
}

@Test
void test_target_type_double(){
when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "20.3")));
when(mockConfig.getTargetType()).thenReturn(TargetType.DOUBLE);
final TranslateProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("key1");
final List<Record<Event>> translatedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertTrue(translatedRecords.get(0).getData().containsKey("targetField"));
assertThat(translatedRecords.get(0).getData().get("targetField", Double.class), is(20.3));
}


private TranslateProcessor createObjectUnderTest() {
Expand Down Expand Up @@ -442,8 +490,8 @@ private Map.Entry<String, String> createMapping(String key, String value) {
return new AbstractMap.SimpleEntry<>(key, value);
}

private Map<String, String> createMapEntries(Map.Entry<String, String>... mappings) {
final Map<String, String> finalMap = new HashMap<>();
private Map<String, Object> createMapEntries(Map.Entry<String, String>... mappings) {
final Map<String, Object> finalMap = new HashMap<>();
for (Map.Entry<String, String> mapping : mappings) {
finalMap.put(mapping.getKey(), mapping.getValue());
}
Expand Down

0 comments on commit cc5ed41

Please sign in to comment.