Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Translate Plugin - Target Type implementation #2979

Merged
merged 2 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-prepper-plugins/translate-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.0'
implementation 'io.micrometer:micrometer-core'
implementation project(path: ':data-prepper-api')
implementation project(path: ':data-prepper-plugins:mutate-event-processors')
testImplementation project(':data-prepper-plugins:log-generator-source')
testImplementation project(':data-prepper-test-common')
implementation 'org.apache.commons:commons-lang3:3.12.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ public class RegexParameterConfiguration {
final boolean DEFAULT_EXACT = true;
@NotNull
@JsonProperty("patterns")
private Map<String, String> patterns;
private Map<String, Object> patterns;

@JsonProperty("exact")
private Boolean exact = DEFAULT_EXACT;

public Map<String, String> getPatterns() {
public Map<String, Object> getPatterns() {
return patterns;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.typeconverter.TypeConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,6 +31,7 @@
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

Expand All @@ -40,15 +42,17 @@ public class TranslateProcessor extends AbstractProcessor<Record<Event>, Record<
private static final Logger LOG = LoggerFactory.getLogger(TranslateProcessor.class);
private final ExpressionEvaluator expressionEvaluator;
private final TranslateProcessorConfig translateProcessorConfig;
private final LinkedHashMap<Range<Float>, String> rangeMappings;
private final Map<String, String> individualMappings;
private final Map<Pattern, String> compiledPatterns;
private final LinkedHashMap<Range<Float>, Object> rangeMappings;
private final Map<String, Object> individualMappings;
private final Map<Pattern, Object> compiledPatterns;
private final TypeConverter converter;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option to not take a dependency on all mutate-events processors would be for us to just move the convert processor out to its own module

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do it in the next PR

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this translate processor needs to be moved into mutate-event-processors


@DataPrepperPluginConstructor
public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorConfig translateProcessorConfig, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.translateProcessorConfig = translateProcessorConfig;
this.expressionEvaluator = expressionEvaluator;
this.converter = translateProcessorConfig.getTargetType().getTargetConverter();
individualMappings = new HashMap<>();
rangeMappings = new LinkedHashMap<>();
compiledPatterns = new HashMap<>();
Expand All @@ -62,22 +66,22 @@ public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorC
checkOverlappingKeys();
}

private void compilePatterns(Map<String, String> mappings) {
private void compilePatterns(Map<String, Object> mappings) {
for (String pattern : mappings.keySet()) {
Pattern compiledPattern = Pattern.compile(pattern);
compiledPatterns.put(compiledPattern, mappings.get(pattern));
}
}

private void processMapField(Map<String, String> map) {
private void processMapField(Map<String, Object> map) {
if (Objects.nonNull(map)) {
for (Map.Entry<String, String> mapEntry : map.entrySet()) {
for (Map.Entry<String, Object> mapEntry : map.entrySet()) {
parseIndividualKeys(mapEntry);
}
}
}

private void parseIndividualKeys(Map.Entry<String, String> mapEntry){
private void parseIndividualKeys(Map.Entry<String, Object> mapEntry){
String[] commaSeparatedKeys = mapEntry.getKey().split(",");
for(String individualKey : commaSeparatedKeys){
if(individualKey.contains("-")){
Expand All @@ -88,7 +92,7 @@ private void parseIndividualKeys(Map.Entry<String, String> mapEntry){
}
}

private void addRangeMapping(Map.Entry<String, String> mapEntry){
private void addRangeMapping(Map.Entry<String, Object> mapEntry){
String[] rangeKeys = mapEntry.getKey().split("-");
if(rangeKeys.length!=2 || !StringUtils.isNumericSpace(rangeKeys[0]) || !StringUtils.isNumericSpace(rangeKeys[1])){
addIndividualMapping(mapEntry.getKey(), mapEntry.getValue());
Expand All @@ -105,7 +109,7 @@ private void addRangeMapping(Map.Entry<String, String> mapEntry){
}
}

private void addIndividualMapping(final String key, final String value){
private void addIndividualMapping(final String key, final Object value){
if(individualMappings.containsKey(key)){
String exceptionMsg = "map option contains duplicate entries of "+key;
throw new InvalidPluginConfigurationException(exceptionMsg);
Expand Down Expand Up @@ -174,15 +178,15 @@ private String getSourceValue(Object recordObject, String sourceKey) {
}
}

private Object getTargetValue(Object sourceObject, List<String> targetValues){
if(sourceObject instanceof String){
return targetValues.get(0);
private Object getTargetValue(Object sourceObject, List<Object> targetValues){
if(sourceObject instanceof String) {
return converter.convert(targetValues.get(0));
}
return targetValues;
return targetValues.stream().map(converter::convert).collect(Collectors.toList());
}

private void performMappings(Object recordObject) {
List<String> targetValues = new ArrayList<>();
List<Object> targetValues = new ArrayList<>();
Object sourceObject = translateProcessorConfig.getSource();
List<String> sourceKeys;
if (sourceObject instanceof List<?>) {
Expand All @@ -195,14 +199,14 @@ private void performMappings(Object recordObject) {
}
for (String sourceKey : sourceKeys) {
String sourceValue = getSourceValue(recordObject, sourceKey);
Optional<String> targetValue = getTargetValueForSource(sourceValue);
Optional<Object> targetValue = getTargetValueForSource(sourceValue);
targetValue.ifPresent(targetValues::add);
}
addTargetToRecords(sourceObject, targetValues, recordObject);
}

private Optional<String> getTargetValueForSource(final String sourceValue) {
Optional<String> targetValue = Optional.empty();
private Optional<Object> getTargetValueForSource(final String sourceValue) {
Optional<Object> targetValue = Optional.empty();
targetValue = targetValue
.or(() -> matchesIndividualEntry(sourceValue))
.or(() -> matchesRangeEntry(sourceValue))
Expand All @@ -211,19 +215,19 @@ private Optional<String> getTargetValueForSource(final String sourceValue) {
return targetValue;
}

private Optional<String> matchesIndividualEntry(final String sourceValue) {
private Optional<Object> matchesIndividualEntry(final String sourceValue) {
if (individualMappings.containsKey(sourceValue)) {
return Optional.of(individualMappings.get(sourceValue));
}
return Optional.empty();
}

private Optional<String> matchesRangeEntry(final String sourceValue) {
private Optional<Object> matchesRangeEntry(final String sourceValue) {
if (!NumberUtils.isParsable(sourceValue)) {
return Optional.empty();
}
Float floatKey = Float.parseFloat(sourceValue);
for (Map.Entry<Range<Float>, String> rangeEntry : rangeMappings.entrySet()) {
for (Map.Entry<Range<Float>, Object> rangeEntry : rangeMappings.entrySet()) {
Range<Float> range = rangeEntry.getKey();
if (range.contains(floatKey)) {
return Optional.of(rangeEntry.getValue());
Expand All @@ -232,7 +236,7 @@ private Optional<String> matchesRangeEntry(final String sourceValue) {
return Optional.empty();
}

private Optional<String> matchesPatternEntry(final String sourceValue) {
private Optional<Object> matchesPatternEntry(final String sourceValue) {
if (compiledPatterns.isEmpty()) {
return Optional.empty();
}
Expand All @@ -246,7 +250,7 @@ private Optional<String> matchesPatternEntry(final String sourceValue) {
return Optional.empty();
}

private void addTargetToRecords(Object sourceObject, List<String> targetValues, Object recordObject) {
private void addTargetToRecords(Object sourceObject, List<Object> targetValues, Object recordObject) {
if (targetValues.isEmpty()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType;
import org.opensearch.dataprepper.typeconverter.TypeConverter;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;


public class TranslateProcessorConfig {


@JsonProperty("source")
@NotNull
private Object source;
Expand All @@ -28,7 +32,7 @@ public class TranslateProcessorConfig {
private String target;

@JsonProperty("map")
private Map<String, String> map;
private Map<String, Object> map;

@JsonProperty("file_path")
private String filePath;
Expand All @@ -45,12 +49,15 @@ public class TranslateProcessorConfig {
@JsonProperty("regex")
private RegexParameterConfiguration regexParameterConfiguration;

@JsonProperty("target_type")
private TargetType targetType = TargetType.STRING;


public Object getSource() { return source; }

public String getTarget() { return target; }

public Map<String, String> getMap() { return map; }
public Map<String, Object> getMap() { return map; }

public String getDefaultValue() { return defaultValue; }

Expand All @@ -60,6 +67,8 @@ public class TranslateProcessorConfig {

public String getIterateOn() { return iterateOn; }

public TargetType getTargetType() { return targetType; }

public RegexParameterConfiguration getRegexParameterConfiguration(){ return regexParameterConfiguration; }


Expand All @@ -85,4 +94,30 @@ public boolean isPatternPresent(){
return regexParameterConfiguration == null || regexParameterConfiguration.getPatterns() != null;
}

@AssertTrue(message = "The mapped values do not match the target type provided")
public boolean isMapTypeValid() {
return map.keySet().stream().allMatch(key -> checkTargetValueType(map.get(key)));
}

@AssertTrue(message = "The pattern values do not match the target type provided")
public boolean isPatternTypeValid() {
if (Objects.isNull(regexParameterConfiguration) || Objects.isNull(regexParameterConfiguration.getPatterns())) {
return true;
}
Map<String, Object> patterns = regexParameterConfiguration.getPatterns();
return patterns.keySet().stream().allMatch(key -> checkTargetValueType(patterns.get(key)));
}

private boolean checkTargetValueType(Object val) throws NumberFormatException {
if (Objects.isNull(targetType)) {
return true;
}
try {
final TypeConverter converter = targetType.getTargetConverter();
converter.convert(val);
} catch (Exception ex) {
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -97,4 +98,15 @@ void test_get_iterate_on() throws NoSuchFieldException, IllegalAccessException{
setField(TranslateProcessorConfig.class, translateProcessorConfig, "iterateOn", "iteratorField");
assertThat(translateProcessorConfig.getIterateOn(),is("iteratorField"));
}

@Test
void test_target_type_default(){
assertThat(translateProcessorConfig.getTargetType(), is(TargetType.STRING));
}

@Test
void test_get_target_type() throws NoSuchFieldException, IllegalAccessException{
setField(TranslateProcessorConfig.class, translateProcessorConfig, "targetType", TargetType.INTEGER);
assertThat(translateProcessorConfig.getTargetType(), is(TargetType.INTEGER));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType;

import java.util.AbstractMap;
import java.util.ArrayList;
Expand Down Expand Up @@ -47,6 +48,7 @@ class TranslateProcessorTest {
void setup() {
lenient().when(mockConfig.getSource()).thenReturn("sourceField");
lenient().when(mockConfig.getTarget()).thenReturn("targetField");
lenient().when(mockConfig.getTargetType()).thenReturn(TargetType.STRING);
lenient().when(mockRegexConfig.getExact()).thenReturn(mockRegexConfig.DEFAULT_EXACT);
}

Expand Down Expand Up @@ -412,6 +414,52 @@ void test_nested_records_no_match() {
assertThat(translatedRecords.get(0).getData().get("collection", ArrayList.class), is(outputJson));
}

@Test
void test_target_type_default(){
when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "200")));
final TranslateProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("key1");
final List<Record<Event>> translatedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertTrue(translatedRecords.get(0).getData().containsKey("targetField"));
assertThat(translatedRecords.get(0).getData().get("targetField", String.class), is("200"));
}

@Test
void test_target_type_integer(){
when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "200")));
when(mockConfig.getTargetType()).thenReturn(TargetType.INTEGER);
final TranslateProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("key1");
final List<Record<Event>> translatedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertTrue(translatedRecords.get(0).getData().containsKey("targetField"));
assertThat(translatedRecords.get(0).getData().get("targetField", Integer.class), is(200));
}

@Test
void test_target_type_boolean(){
when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "false")));
when(mockConfig.getTargetType()).thenReturn(TargetType.BOOLEAN);
final TranslateProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("key1");
final List<Record<Event>> translatedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertTrue(translatedRecords.get(0).getData().containsKey("targetField"));
assertThat(translatedRecords.get(0).getData().get("targetField", Boolean.class), is(false));
}

@Test
void test_target_type_double(){
when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "20.3")));
when(mockConfig.getTargetType()).thenReturn(TargetType.DOUBLE);
final TranslateProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("key1");
final List<Record<Event>> translatedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertTrue(translatedRecords.get(0).getData().containsKey("targetField"));
assertThat(translatedRecords.get(0).getData().get("targetField", Double.class), is(20.3));
}


private TranslateProcessor createObjectUnderTest() {
Expand Down Expand Up @@ -442,8 +490,8 @@ private Map.Entry<String, String> createMapping(String key, String value) {
return new AbstractMap.SimpleEntry<>(key, value);
}

private Map<String, String> createMapEntries(Map.Entry<String, String>... mappings) {
final Map<String, String> finalMap = new HashMap<>();
private Map<String, Object> createMapEntries(Map.Entry<String, String>... mappings) {
final Map<String, Object> finalMap = new HashMap<>();
for (Map.Entry<String, String> mapping : mappings) {
finalMap.put(mapping.getKey(), mapping.getValue());
}
Expand Down
Loading