Skip to content

Commit

Permalink
Added IterateOn functionality, default, exact and testcases
Browse files Browse the repository at this point in the history
Signed-off-by: Vishal Boinapalli <[email protected]>
  • Loading branch information
vishalboin committed Jun 29, 2023
1 parent 241489f commit 32f85f1
Show file tree
Hide file tree
Showing 6 changed files with 388 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

public class RegexParameterConfiguration {

private static final boolean DEFAULT_EXACT = true;
public final boolean DEFAULT_EXACT = true;
@NotNull
@JsonProperty("patterns")
private Map<String, String> patterns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
Expand All @@ -40,6 +42,7 @@ public class TranslateProcessor extends AbstractProcessor<Record<Event>, Record<
private final LinkedHashMap<Range<Float>, String> rangeMappings;
private final Map<String, String> individualMappings;
private final Map<String, String> patternMappings;
private final Map<Pattern, String> compiledPatterns;

@DataPrepperPluginConstructor
public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorConfig translateProcessorConfig, final ExpressionEvaluator expressionEvaluator) {
Expand All @@ -48,20 +51,28 @@ public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorC
this.expressionEvaluator = expressionEvaluator;
individualMappings = new HashMap<>();
rangeMappings = new LinkedHashMap<>();
if(this.translateProcessorConfig.getRegexParameterConfiguration()!=null) {
patternMappings = translateProcessorConfig.getRegexParameterConfiguration().getPatterns();
}
else{
patternMappings = Collections.emptyMap();
patternMappings = new HashMap<>();
compiledPatterns = new HashMap<>();
if (Objects.nonNull(this.translateProcessorConfig.getRegexParameterConfiguration())) {
compilePatterns(translateProcessorConfig
.getRegexParameterConfiguration()
.getPatterns());
}

processMapField(translateProcessorConfig.getMap());
parseFile(translateProcessorConfig.getFilePath());
checkOverlappingKeys();
}

private void processMapField(Map<String, String> map){
if(Objects.nonNull(map)) {
private void compilePatterns(Map<String, String> mappings) {
patternMappings.putAll(mappings);
for (String pattern : mappings.keySet()) {
Pattern compiledPattern = Pattern.compile(pattern);
compiledPatterns.put(compiledPattern, patternMappings.get(pattern));
}
}

private void processMapField(Map<String, String> map) {
if (Objects.nonNull(map)) {
for (Map.Entry<String, String> mapEntry : map.entrySet()) {
parseIndividualKeys(mapEntry);
}
Expand All @@ -73,8 +84,7 @@ private void parseIndividualKeys(Map.Entry<String, String> mapEntry){
for(String individualKey : commaSeparatedKeys){
if(individualKey.contains("-")){
addRangeMapping(Map.entry(individualKey, mapEntry.getValue()));
}
else {
} else {
addIndividualMapping(individualKey, mapEntry.getValue());
}
}
Expand All @@ -98,7 +108,7 @@ private void addRangeMapping(Map.Entry<String, String> mapEntry){
}
}

private void addIndividualMapping(String key, String value){
private void addIndividualMapping(final String key, final String value){
if(individualMappings.containsKey(key)){
String exceptionMsg = "map option contains duplicate entries of "+key;
throw new InvalidPluginConfigurationException(exceptionMsg);
Expand All @@ -108,22 +118,22 @@ private void addIndividualMapping(String key, String value){
}
}

private boolean isRangeOverlapping(Range<Float> rangeEntry){
for(Range<Float> range : rangeMappings.keySet()){
if(range.isOverlappedBy(rangeEntry)){
private boolean isRangeOverlapping(Range<Float> rangeEntry) {
for (Range<Float> range : rangeMappings.keySet()) {
if (range.isOverlappedBy(rangeEntry)) {
return true;
}
}
return false;
}

private void checkOverlappingKeys(){
for(String individualKey : individualMappings.keySet()){
if(NumberUtils.isParsable(individualKey)){
private void checkOverlappingKeys() {
for (String individualKey : individualMappings.keySet()) {
if (NumberUtils.isParsable(individualKey)) {
Float floatKey = Float.parseFloat(individualKey);
Range<Float> range = Range.between(floatKey, floatKey);
if(isRangeOverlapping(range)){
String exceptionMsg = "map option contains key "+individualKey+" that overlaps with other range entries";
if (isRangeOverlapping(range)) {
String exceptionMsg = "map option contains key " + individualKey + " that overlaps with other range entries";
throw new InvalidPluginConfigurationException(exceptionMsg);
}
}
Expand All @@ -136,66 +146,124 @@ private void parseFile(String filePath){

@Override
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
//todo
for(final Record<Event> record : records) {
for (final Record<Event> record : records) {
final Event recordEvent = record.getData();
if (Objects.nonNull(translateProcessorConfig.getMapWhen()) && !expressionEvaluator.evaluateConditional(translateProcessorConfig.getMapWhen(), recordEvent)) {
if (Objects.nonNull(translateProcessorConfig.getTranslateWhen()) && !expressionEvaluator.evaluateConditional(translateProcessorConfig.getTranslateWhen(), recordEvent)) {
continue;
}
try {
String matchKey = record.getData().get(translateProcessorConfig.getSource(), String.class);
if(matchesIndividualEntry(record, matchKey) || matchesRangeEntry(record, matchKey) || matchesPatternEntry(record, matchKey)){
continue;
if (Objects.nonNull(translateProcessorConfig.getIterateOn())) {
List<Map<String, Object>> nestedObjects = recordEvent.get(translateProcessorConfig.getIterateOn(), List.class);
for (Map<String, Object> nextedObject : nestedObjects) {
performMappings(nextedObject);
}
recordEvent.put(translateProcessorConfig.getIterateOn(), nestedObjects);
} else {
performMappings(recordEvent);
}
else{
} catch (Exception ex) {
LOG.error(EVENT, "Error mapping the source [{}] of entry [{}]", translateProcessorConfig.getSource(),
record.getData(), ex);
}
}
return records;
}

// todo : add default, increment metrics, and/or add_tags
private String getSourceValue(Object nestedObject, String sourceKey) {
if (nestedObject instanceof Map) {
return (String) ((Map<?, ?>) nestedObject).get(sourceKey);
} else {
return ((Event) nestedObject).get(sourceKey, String.class);
}
}

}
} catch (Exception ex){
LOG.error(EVENT, "Error mapping the source [{}] of entry [{}]",
translateProcessorConfig.getSource(), record.getData(), ex);
private Object getTargetValue(Object sourceObject, List<String> targetValues){
if(sourceObject instanceof String){
return targetValues.get(0);
}
return targetValues;
}

private void performMappings(Object recordObject) {
List<String> targetValues = new ArrayList<>();
Object sourceObject = translateProcessorConfig.getSource();
if (sourceObject instanceof List<?>) {
List<String> sourceKeys = (ArrayList<String>) sourceObject;
for (String sourceKey : sourceKeys) {
String sourceValue = getSourceValue(recordObject, sourceKey);
populateTarget(sourceValue, targetValues);
}
} else if (sourceObject instanceof String) {
String sourceKey = (String) sourceObject;
String sourceValue = getSourceValue(recordObject, sourceKey);
populateTarget(sourceValue, targetValues);
}
return records;
addTargetToRecords(sourceObject, targetValues, recordObject);
}

private void populateTarget(final String sourceKey, List<String> targetValues) {
Optional<String> targetValue = Optional.empty();
targetValue = targetValue
.or(() -> matchesIndividualEntry(sourceKey))
.or(() -> matchesRangeEntry(sourceKey))
.or(() -> matchesPatternEntry(sourceKey))
.or(() -> Optional.ofNullable(translateProcessorConfig.getDefaultValue()));
targetValue.ifPresent(targetValues::add);
}

public boolean matchesIndividualEntry(Record<Event> record, String matchKey){
if(individualMappings.containsKey(matchKey)){
record.getData().put(translateProcessorConfig.getTarget(), individualMappings.get(matchKey));
return true;
private Optional<String> matchesIndividualEntry(final String sourceKey) {
if (individualMappings.containsKey(sourceKey)) {
return Optional.of(individualMappings.get(sourceKey));
}
return false;
return Optional.empty();
}

public boolean matchesRangeEntry(Record<Event> record, String matchKey){
if(!NumberUtils.isParsable(matchKey)){
return false;
private Optional<String> matchesRangeEntry(final String sourceKey) {
if (!NumberUtils.isParsable(sourceKey)) {
return Optional.empty();
}
Float floatKey = Float.parseFloat(matchKey);
for(Map.Entry<Range<Float>, String> rangeEntry : rangeMappings.entrySet()) {
Float floatKey = Float.parseFloat(sourceKey);
for (Map.Entry<Range<Float>, String> rangeEntry : rangeMappings.entrySet()) {
Range<Float> range = rangeEntry.getKey();
if (range.contains(floatKey)) {
record.getData().put(translateProcessorConfig.getTarget(), rangeEntry.getValue());
return true;
return Optional.of(rangeEntry.getValue());
}
}
return false;
return Optional.empty();
}

public boolean matchesPatternEntry(Record<Event> record, String matchKey){
//todo
if(!Objects.nonNull(patternMappings)){
return false;
private Optional<String> matchesPatternEntry(final String sourceKey) {
if (!compiledPatterns.isEmpty()) {
for (Pattern pattern : compiledPatterns.keySet()) {
if (pattern.matcher(sourceKey).matches()) {
return Optional.of(compiledPatterns.get(pattern));
}
}
if (!translateProcessorConfig.getRegexParameterConfiguration().getExact()) {
for (String pattern : patternMappings.keySet()) {
if (pattern.contains(sourceKey)) {
return Optional.of(patternMappings.get(pattern));
}
}
}
}
for(String pattern : patternMappings.keySet()){
if(Pattern.matches(pattern, matchKey)){
record.getData().put(translateProcessorConfig.getTarget(), patternMappings.get(pattern));
return true;
return Optional.empty();
}

private void addTargetToRecords(Object sourceObject, List<String> targetValues, Object recordObject) {
String targetField = translateProcessorConfig.getTarget();
if (!targetValues.isEmpty()) {
if(recordObject instanceof Map){
Map<String, Object> recordMap = (Map<String, Object>) recordObject;
recordMap.put(targetField, getTargetValue(sourceObject, targetValues));
}
else if(recordObject instanceof Event){
Event event = (Event) recordObject;
event.put(targetField, getTargetValue(sourceObject, targetValues));
}
}
return false;
}

@Override
public void prepareForShutdown() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

Expand All @@ -18,8 +20,7 @@ public class TranslateProcessorConfig {

@JsonProperty("source")
@NotNull
@NotEmpty
private String source;
private Object source;

@JsonProperty("target")
@NotNull
Expand All @@ -28,39 +29,60 @@ public class TranslateProcessorConfig {

@JsonProperty("map")
private Map<String, String> map;

@JsonProperty("file_path")
private String filePath;

@JsonProperty("map_when")
private String mapWhen;
@JsonProperty("default")
private String defaultValue;

@JsonProperty("translate_when")
private String translateWhen;

@JsonProperty("iterate_on")
private String iterateOn;

@JsonProperty("regex")
private RegexParameterConfiguration regexParameterConfiguration;


public String getSource() { return source; }
public Object getSource() { return source; }

public String getTarget() { return target; }

public Map<String, String> getMap() { return map; }

public String getDefaultValue() { return defaultValue; }

public String getFilePath() { return filePath; }

public String getMapWhen() { return mapWhen; }
public String getTranslateWhen() { return translateWhen; }

public String getIterateOn() { return iterateOn; }

public RegexParameterConfiguration getRegexParameterConfiguration(){ return regexParameterConfiguration; }


@AssertTrue(message = "Either of map / patterns / file_path options need to be configured. (pattern option is mandatory while configuring regex option)")
@AssertTrue(message = "source field must be a string or list of strings")
public boolean isSourceFieldValid(){
if(source instanceof String){
return true;
}
if(source instanceof List<?>){
List<?> sourceList = (List<?>) source;
return sourceList.stream().allMatch(sourceItem -> sourceItem instanceof String);
}
return false;
}

@AssertTrue(message = "Either of map or patterns or file_path options need to be configured.")
public boolean hasMappings() {
return (Stream.of(map, filePath, regexParameterConfiguration).filter(n -> n!=null).count() != 0) && checkPatternUnderRegex();
return Stream.of(map, filePath, regexParameterConfiguration).filter(n -> n!=null).count() != 0;
}

public boolean checkPatternUnderRegex(){
if(regexParameterConfiguration!=null && regexParameterConfiguration.getPatterns()==null){
return false;
}
return true;
@AssertTrue(message = "pattern option is mandatory while configuring regex option")
public boolean isPatternPresent(){
return regexParameterConfiguration == null || regexParameterConfiguration.getPatterns() != null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ void setup(){
}

@Test
public void test_get_patterns() throws NoSuchFieldException, IllegalAccessException{
void test_get_patterns() throws NoSuchFieldException, IllegalAccessException{
final Map<String, String> patternMap = Collections.singletonMap("key1", "val1");
setField(RegexParameterConfiguration.class, regexParameterConfiguration, "patterns", patternMap);
assertThat(regexParameterConfiguration.getPatterns(), is(patternMap));
Expand Down
Loading

0 comments on commit 32f85f1

Please sign in to comment.