Skip to content

Commit

Permalink
Translate Processor - Added file_path functionality for local files (o…
Browse files Browse the repository at this point in the history
…pensearch-project#3034)

* Changed target_type option name to type

Signed-off-by: Vishal Boinapalli <[email protected]>

* Added file_path functionality for local file

Signed-off-by: Vishal Boinapalli <[email protected]>

* Fixed file parsing issue and changed error msgs

Signed-off-by: Vishal Boinapalli <[email protected]>

* Added IOException to log, made testcase change for mappings validation

Signed-off-by: Vishal Boinapalli <[email protected]>

---------

Signed-off-by: Vishal Boinapalli <[email protected]>
  • Loading branch information
vishalboin committed Jul 19, 2023
1 parent cf47dac commit 958c15e
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.opensearch.dataprepper.plugins.processor.translate;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

public class FilePathParser {
@JsonProperty("mappings")
@Valid
private List<MappingsParameterConfig> fileMappingConfigs;

public Optional<List<MappingsParameterConfig>> getCombinedMappings(List<MappingsParameterConfig> mappingConfigs) {
if(Objects.isNull(mappingConfigs) || mappingConfigs.isEmpty()){
return Optional.ofNullable(fileMappingConfigs);
}
try{
for (MappingsParameterConfig fileMappingConfig : fileMappingConfigs) {
boolean isDuplicateSource = false;
for (MappingsParameterConfig mappingConfig : mappingConfigs) {
if (mappingConfig.getSource().equals(fileMappingConfig.getSource())) {
isDuplicateSource = true;
combineTargets(fileMappingConfig, mappingConfig);
}
}
if (!isDuplicateSource) {
mappingConfigs.add(fileMappingConfig);
}
}
return Optional.of(mappingConfigs);
} catch (Exception ex){
return Optional.empty();
}
}

private void combineTargets(MappingsParameterConfig filePathMapping, MappingsParameterConfig mappingConfig) {
if(Objects.isNull(mappingConfig)){
return;
}
List<TargetsParameterConfig> fileTargetConfigs = filePathMapping.getTargetsParameterConfigs();
List<TargetsParameterConfig> mappingsTargetConfigs = mappingConfig.getTargetsParameterConfigs();
List<TargetsParameterConfig> combinedTargetConfigs = new ArrayList<>(mappingsTargetConfigs);

for (TargetsParameterConfig fileTargetConfig : fileTargetConfigs) {
if (!isTargetPresent(fileTargetConfig, combinedTargetConfigs)) {
combinedTargetConfigs.add(fileTargetConfig);
}
}
mappingConfig.setTargetsParameterConfigs(combinedTargetConfigs);
}

private boolean isTargetPresent(TargetsParameterConfig fileTargetConfig, List<TargetsParameterConfig> combinedTargetConfigs){
String fileTarget = fileTargetConfig.getTarget();
return combinedTargetConfigs.stream().anyMatch(targetConfig -> fileTarget.equals(targetConfig.getTarget()));
}

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package org.opensearch.dataprepper.plugins.processor.translate;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotNull;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class MappingsParameterConfig {

Expand All @@ -16,9 +19,8 @@ public class MappingsParameterConfig {
private String iterateOn;

@JsonProperty("targets")
@NotNull
private List<TargetsParameterConfig> targetsParameterConfigs;

@Valid
private List<TargetsParameterConfig> targetsParameterConfigs = new ArrayList<>();

public Object getSource() {
return source;
Expand All @@ -32,14 +34,21 @@ public List<TargetsParameterConfig> getTargetsParameterConfigs() {
return targetsParameterConfigs;
}

public void parseMappings(){
for(TargetsParameterConfig targetsParameterConfig: targetsParameterConfigs){
targetsParameterConfig.parseMappings();
}
@AssertTrue(message = "source option not configured")
public boolean isSourcePresent(){
return Objects.nonNull(source);
}

@AssertTrue(message = "targets option not configured")
public boolean isTargetsPresent(){
return Objects.nonNull(targetsParameterConfigs) && !targetsParameterConfigs.isEmpty();
}

@AssertTrue(message = "source field must be a string or list of strings")
public boolean isSourceFieldValid() {
if(Objects.isNull(source)){
return true;
}
if (source instanceof String) {
return true;
}
Expand All @@ -50,4 +59,17 @@ public boolean isSourceFieldValid() {
return false;
}

public void parseMappings(){
if(Objects.isNull(targetsParameterConfigs)){
return;
}
for(TargetsParameterConfig targetsParameterConfig: targetsParameterConfigs){
targetsParameterConfig.parseMappings();
}
}

public void setTargetsParameterConfigs(List<TargetsParameterConfig> targetsParameterConfigs){
this.targetsParameterConfigs = targetsParameterConfigs;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ public class TargetsParameterConfig {
private RegexParameterConfiguration regexParameterConfig;
@JsonProperty("default")
private String defaultValue;
@JsonProperty("target_type")
@JsonProperty("type")
private TargetType targetType = TargetType.STRING;

public TargetsParameterConfig(){
converter = TargetType.STRING.getTargetConverter();
}
public TargetsParameterConfig(Map<String, Object> map, String target, RegexParameterConfiguration regexParameterConfig, String translateWhen, String defaultValue, TargetType targetType) {
this.targetType = Optional
.ofNullable(targetType)
Expand Down Expand Up @@ -102,6 +105,9 @@ public boolean hasMappings() {

@AssertTrue(message = "The mapped values do not match the target type provided")
public boolean isMapTypeValid() {
if(Objects.isNull(map)){
return true;
}
return map.keySet().stream().allMatch(key -> checkTargetValueType(map.get(key)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,17 @@ public class TranslateProcessor extends AbstractProcessor<Record<Event>, Record<
public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorConfig translateProcessorConfig, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.expressionEvaluator = expressionEvaluator;
mappingsConfig = translateProcessorConfig.getMappingsParameterConfigs();
mappingsConfig.forEach(MappingsParameterConfig::parseMappings);
parseFile(translateProcessorConfig.getFilePath());
}

private void parseFile(String filePath){
//todo
mappingsConfig = translateProcessorConfig.getCombinedParameterConfigs();
Optional.ofNullable(mappingsConfig)
.ifPresent(configs -> configs.forEach(MappingsParameterConfig::parseMappings));
}

@Override
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
if(Objects.isNull(mappingsConfig)){
continue;
}
final Event recordEvent = record.getData();
for (MappingsParameterConfig mappingConfig : mappingsConfig) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,34 @@
package org.opensearch.dataprepper.plugins.processor.translate;


import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import java.util.Objects;
import java.util.Optional;


public class TranslateProcessorConfig {

@JsonProperty("file_path")
private String filePath;

@NotNull
@JsonProperty("mappings")
private List<MappingsParameterConfig> mappingsParameterConfigs;
@Valid
private List<MappingsParameterConfig> mappingsParameterConfigs = new ArrayList<>();

@JsonIgnore
private List<MappingsParameterConfig> combinedParameterConfigs;

public String getFilePath() {
return filePath;
Expand All @@ -31,9 +43,40 @@ public List<MappingsParameterConfig> getMappingsParameterConfigs() {
return mappingsParameterConfigs;
}

@AssertTrue(message = "Either mappings or file_path option needs to be configured.")
public List<MappingsParameterConfig> getCombinedParameterConfigs() {
if(Objects.isNull(combinedParameterConfigs)){
combinedParameterConfigs = mappingsParameterConfigs;
}
return combinedParameterConfigs;
}

@AssertTrue(message = "Please ensure that at least one of the options, either \"mappings\" or \"file_path\", is properly configured.")
public boolean hasMappings() {
return Stream.of(mappingsParameterConfigs, filePath).filter(n -> n != null).count() != 0;
return (Objects.nonNull(mappingsParameterConfigs) && !mappingsParameterConfigs.isEmpty()) || Objects.nonNull(filePath);
}

@AssertTrue(message = "\"mappings\" option should not be empty.")
public boolean isMappingsValid() {
return Objects.nonNull(mappingsParameterConfigs);
}

@AssertTrue(message = "The file specified in the \"file_path\" option is not properly configured.")
public boolean isFileValid() {
return Objects.isNull(filePath) || readFileMappings(filePath);
}

private boolean readFileMappings(String filePath) {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
try {
FilePathParser fileParser = mapper.readValue(new File(filePath), FilePathParser.class);
Optional<List<MappingsParameterConfig>> optionalCombinedConfigs = fileParser.getCombinedMappings(mappingsParameterConfigs);
optionalCombinedConfigs.ifPresent(combinedConfigs -> combinedParameterConfigs = combinedConfigs);
return Optional.ofNullable(combinedParameterConfigs).map(configs -> true).orElse(false);
} catch (IOException ex) {
Logger LOG = LoggerFactory.getLogger(TranslateProcessor.class);
LOG.error("Unable to parse the mappings from file", ex);
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ void test_get_source() {
assertThat(mappingsParameterConfig.getSource(),is("sourceKey"));
}

@Test
void test_source_present() {
assertTrue(mappingsParameterConfig.isSourcePresent());
}

@Test
void test_targets_present() {
assertFalse(mappingsParameterConfig.isTargetsPresent());
}

@Test
void test_source_field_valid_types() throws NoSuchFieldException, IllegalAccessException{
setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", "key1");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package org.opensearch.dataprepper.plugins.processor.translate;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;

import static org.hamcrest.core.Is.is;
Expand All @@ -28,6 +33,12 @@ void test_no_mappings_filepath_options_present(){
assertFalse(translateProcessorConfig.hasMappings());
}

@Test
void test_no_mappings_present() throws NoSuchFieldException, IllegalAccessException {
setField(TranslateProcessorConfig.class, translateProcessorConfig, "mappingsParameterConfigs", null);
assertFalse(translateProcessorConfig.isMappingsValid());
}

@Test
void test_only_mappings_option_present() throws NoSuchFieldException, IllegalAccessException{
setField(TranslateProcessorConfig.class, translateProcessorConfig, "mappingsParameterConfigs", List.of(new MappingsParameterConfig()));
Expand All @@ -54,4 +65,54 @@ void test_get_mappings() throws NoSuchFieldException, IllegalAccessException{
assertThat(translateProcessorConfig.getMappingsParameterConfigs(), is(mappingsParameterConfigs));
}

@Nested
class FilePathTests{
private File testMappingsFile;

@BeforeEach
void setup() throws IOException {
testMappingsFile = File.createTempFile("test", ".yaml");
}

@AfterEach
void cleanup() {
testMappingsFile.delete();
}

@Test
void test_is_file_valid_with_valid_file() throws NoSuchFieldException, IllegalAccessException, IOException {
String fileContent = "mappings:\n" +
" - source: status\n" +
" targets:\n" +
" - target: test\n" +
" map:\n" +
" 120: success";
Files.write(testMappingsFile.toPath(), fileContent.getBytes());

String filePath = testMappingsFile.getAbsolutePath();
setField(TranslateProcessorConfig.class, translateProcessorConfig, "filePath", filePath);

assertTrue(translateProcessorConfig.isFileValid());
}

@Test
void test_is_file_valid_with_invalid_file() throws NoSuchFieldException, IllegalAccessException, IOException {
String fileContent = "mappings:";
Files.write(testMappingsFile.toPath(), fileContent.getBytes());

String filePath = testMappingsFile.getAbsolutePath();
setField(TranslateProcessorConfig.class, translateProcessorConfig, "filePath", filePath);

assertFalse(translateProcessorConfig.isFileValid());
}

@Test
void test_is_file_invalid_with_invalid_file_path() throws NoSuchFieldException, IllegalAccessException {
String filePath = "/invalid/file/nofile.yaml";
setField(TranslateProcessorConfig.class, translateProcessorConfig, "filePath", filePath);

assertFalse(translateProcessorConfig.isFileValid());
}
}

}
Loading

0 comments on commit 958c15e

Please sign in to comment.