Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Translate Processor - Added file_path functionality for local files #3034

Merged
merged 4 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.opensearch.dataprepper.plugins.processor.translate;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

public class FilePathParser {
@JsonProperty("mappings")
@Valid
Comment on lines +12 to +13
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these annotations doing anything? This isn't a part of the user configuration right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeahh.
@JsonProperty : The file provided will consist of option mappings, under which translations will be provided. Since I am using a different variable name fileMappingConfigs in the FilePathPraser.Class, @JsonProperty is required to indicate that mappings filed should be mapped to fileMappingConfigs

@Valid : The type of fileMappingConfigs is List<MappingParameterConfig>. MappingParameterConfig.Class consists of validation checks for the options provided under mappings like source, targets. To validate those checks we need to provide the @Valid annotation.

private List<MappingsParameterConfig> fileMappingConfigs;

public Optional<List<MappingsParameterConfig>> getCombinedMappings(List<MappingsParameterConfig> mappingConfigs) {
if(Objects.isNull(mappingConfigs) || mappingConfigs.isEmpty()){
return Optional.ofNullable(fileMappingConfigs);
}
try{
for (MappingsParameterConfig fileMappingConfig : fileMappingConfigs) {
boolean isDuplicateSource = false;
for (MappingsParameterConfig mappingConfig : mappingConfigs) {
if (mappingConfig.getSource().equals(fileMappingConfig.getSource())) {
isDuplicateSource = true;
combineTargets(fileMappingConfig, mappingConfig);
}
}
if (!isDuplicateSource) {
mappingConfigs.add(fileMappingConfig);
}
}
return Optional.of(mappingConfigs);
} catch (Exception ex){
return Optional.empty();
}
}

private void combineTargets(MappingsParameterConfig filePathMapping, MappingsParameterConfig mappingConfig) {
if(Objects.isNull(mappingConfig)){
return;
}
List<TargetsParameterConfig> fileTargetConfigs = filePathMapping.getTargetsParameterConfigs();
List<TargetsParameterConfig> mappingsTargetConfigs = mappingConfig.getTargetsParameterConfigs();
List<TargetsParameterConfig> combinedTargetConfigs = new ArrayList<>(mappingsTargetConfigs);

for (TargetsParameterConfig fileTargetConfig : fileTargetConfigs) {
if (!isTargetPresent(fileTargetConfig, combinedTargetConfigs)) {
combinedTargetConfigs.add(fileTargetConfig);
}
}
mappingConfig.setTargetsParameterConfigs(combinedTargetConfigs);
}

private boolean isTargetPresent(TargetsParameterConfig fileTargetConfig, List<TargetsParameterConfig> combinedTargetConfigs){
String fileTarget = fileTargetConfig.getTarget();
return combinedTargetConfigs.stream().anyMatch(targetConfig -> fileTarget.equals(targetConfig.getTarget()));
}

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package org.opensearch.dataprepper.plugins.processor.translate;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotNull;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class MappingsParameterConfig {

Expand All @@ -16,9 +19,8 @@ public class MappingsParameterConfig {
private String iterateOn;

@JsonProperty("targets")
@NotNull
private List<TargetsParameterConfig> targetsParameterConfigs;

@Valid
private List<TargetsParameterConfig> targetsParameterConfigs = new ArrayList<>();

public Object getSource() {
return source;
Expand All @@ -32,14 +34,21 @@ public List<TargetsParameterConfig> getTargetsParameterConfigs() {
return targetsParameterConfigs;
}

public void parseMappings(){
for(TargetsParameterConfig targetsParameterConfig: targetsParameterConfigs){
targetsParameterConfig.parseMappings();
}
@AssertTrue(message = "source option not configured")
public boolean isSourcePresent(){
return Objects.nonNull(source);
}

@AssertTrue(message = "targets option not configured")
public boolean isTargetsPresent(){
return Objects.nonNull(targetsParameterConfigs) && !targetsParameterConfigs.isEmpty();
}

@AssertTrue(message = "source field must be a string or list of strings")
public boolean isSourceFieldValid() {
if(Objects.isNull(source)){
return true;
}
if (source instanceof String) {
return true;
}
Expand All @@ -50,4 +59,17 @@ public boolean isSourceFieldValid() {
return false;
}

public void parseMappings(){
if(Objects.isNull(targetsParameterConfigs)){
return;
}
for(TargetsParameterConfig targetsParameterConfig: targetsParameterConfigs){
targetsParameterConfig.parseMappings();
}
}

public void setTargetsParameterConfigs(List<TargetsParameterConfig> targetsParameterConfigs){
this.targetsParameterConfigs = targetsParameterConfigs;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ public class TargetsParameterConfig {
private RegexParameterConfiguration regexParameterConfig;
@JsonProperty("default")
private String defaultValue;
@JsonProperty("target_type")
@JsonProperty("type")
private TargetType targetType = TargetType.STRING;

public TargetsParameterConfig(){
converter = TargetType.STRING.getTargetConverter();
}
public TargetsParameterConfig(Map<String, Object> map, String target, RegexParameterConfiguration regexParameterConfig, String translateWhen, String defaultValue, TargetType targetType) {
this.targetType = Optional
.ofNullable(targetType)
Expand Down Expand Up @@ -102,6 +105,9 @@ public boolean hasMappings() {

@AssertTrue(message = "The mapped values do not match the target type provided")
public boolean isMapTypeValid() {
if(Objects.isNull(map)){
return true;
}
return map.keySet().stream().allMatch(key -> checkTargetValueType(map.get(key)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,17 @@ public class TranslateProcessor extends AbstractProcessor<Record<Event>, Record<
public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorConfig translateProcessorConfig, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.expressionEvaluator = expressionEvaluator;
mappingsConfig = translateProcessorConfig.getMappingsParameterConfigs();
mappingsConfig.forEach(MappingsParameterConfig::parseMappings);
parseFile(translateProcessorConfig.getFilePath());
}

private void parseFile(String filePath){
//todo
mappingsConfig = translateProcessorConfig.getCombinedParameterConfigs();
Optional.ofNullable(mappingsConfig)
.ifPresent(configs -> configs.forEach(MappingsParameterConfig::parseMappings));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to stop the pipeline if only file_path option is present and the file is invalid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If the mappings provided in the file_path option are not configured properly, pipeline will fail during the config check. This behavior is irrespective of only file_path option provided or both file_path and mappings options provided.

}

@Override
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
if(Objects.isNull(mappingsConfig)){
continue;
}
final Event recordEvent = record.getData();
for (MappingsParameterConfig mappingConfig : mappingsConfig) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,34 @@
package org.opensearch.dataprepper.plugins.processor.translate;


import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import java.util.Objects;
import java.util.Optional;


public class TranslateProcessorConfig {

@JsonProperty("file_path")
private String filePath;

@NotNull
@JsonProperty("mappings")
private List<MappingsParameterConfig> mappingsParameterConfigs;
@Valid
private List<MappingsParameterConfig> mappingsParameterConfigs = new ArrayList<>();

@JsonIgnore
private List<MappingsParameterConfig> combinedParameterConfigs;

public String getFilePath() {
return filePath;
Expand All @@ -31,9 +43,40 @@ public List<MappingsParameterConfig> getMappingsParameterConfigs() {
return mappingsParameterConfigs;
}

@AssertTrue(message = "Either mappings or file_path option needs to be configured.")
public List<MappingsParameterConfig> getCombinedParameterConfigs() {
if(Objects.isNull(combinedParameterConfigs)){
combinedParameterConfigs = mappingsParameterConfigs;
}
return combinedParameterConfigs;
}

@AssertTrue(message = "Please ensure that at least one of the options, either \"mappings\" or \"file_path\", is properly configured.")
public boolean hasMappings() {
return Stream.of(mappingsParameterConfigs, filePath).filter(n -> n != null).count() != 0;
return (Objects.nonNull(mappingsParameterConfigs) && !mappingsParameterConfigs.isEmpty()) || Objects.nonNull(filePath);
}

@AssertTrue(message = "\"mappings\" option should not be empty.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the @NotEmpty annotation not working?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works. But in this particular case, the pipeline should succeed even if only file_path option is provided. @NotEmpty annotation checks if the field is either empty or null. So if the mappings option is not provided it fails, even if file_path option is provided. This check I provided is required because if the user configures mappings: option but there are no sub-options configured, the pipeline should fail. Like in the scenario below:

 processor:
   - translate:
       file_path: "/path/to/file.yaml"
       mappings:
       

public boolean isMappingsValid() {
return Objects.nonNull(mappingsParameterConfigs);
}

@AssertTrue(message = "The file specified in the \"file_path\" option is not properly configured.")
public boolean isFileValid() {
return Objects.isNull(filePath) || readFileMappings(filePath);
}

private boolean readFileMappings(String filePath) {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
try {
FilePathParser fileParser = mapper.readValue(new File(filePath), FilePathParser.class);
Optional<List<MappingsParameterConfig>> optionalCombinedConfigs = fileParser.getCombinedMappings(mappingsParameterConfigs);
optionalCombinedConfigs.ifPresent(combinedConfigs -> combinedParameterConfigs = combinedConfigs);
return Optional.ofNullable(combinedParameterConfigs).map(configs -> true).orElse(false);
} catch (IOException ex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log this error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will address this in the next PR.

Logger LOG = LoggerFactory.getLogger(TranslateProcessor.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should declare these at the top private static final Logger LOG = ...

LOG.error("Unable to parse the mappings from file", ex);
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ void test_get_source() {
assertThat(mappingsParameterConfig.getSource(),is("sourceKey"));
}

@Test
void test_source_present() {
assertTrue(mappingsParameterConfig.isSourcePresent());
}

@Test
void test_targets_present() {
assertFalse(mappingsParameterConfig.isTargetsPresent());
}

@Test
void test_source_field_valid_types() throws NoSuchFieldException, IllegalAccessException{
setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", "key1");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package org.opensearch.dataprepper.plugins.processor.translate;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;

import static org.hamcrest.core.Is.is;
Expand All @@ -28,6 +33,12 @@ void test_no_mappings_filepath_options_present(){
assertFalse(translateProcessorConfig.hasMappings());
}

@Test
void test_no_mappings_present() throws NoSuchFieldException, IllegalAccessException {
setField(TranslateProcessorConfig.class, translateProcessorConfig, "mappingsParameterConfigs", null);
assertFalse(translateProcessorConfig.isMappingsValid());
}

@Test
void test_only_mappings_option_present() throws NoSuchFieldException, IllegalAccessException{
setField(TranslateProcessorConfig.class, translateProcessorConfig, "mappingsParameterConfigs", List.of(new MappingsParameterConfig()));
Expand All @@ -54,4 +65,54 @@ void test_get_mappings() throws NoSuchFieldException, IllegalAccessException{
assertThat(translateProcessorConfig.getMappingsParameterConfigs(), is(mappingsParameterConfigs));
}

@Nested
class FilePathTests{
private File testMappingsFile;

@BeforeEach
void setup() throws IOException {
testMappingsFile = File.createTempFile("test", ".yaml");
}

@AfterEach
void cleanup() {
testMappingsFile.delete();
}

@Test
void test_is_file_valid_with_valid_file() throws NoSuchFieldException, IllegalAccessException, IOException {
String fileContent = "mappings:\n" +
" - source: status\n" +
" targets:\n" +
" - target: test\n" +
" map:\n" +
" 120: success";
Files.write(testMappingsFile.toPath(), fileContent.getBytes());

String filePath = testMappingsFile.getAbsolutePath();
setField(TranslateProcessorConfig.class, translateProcessorConfig, "filePath", filePath);

assertTrue(translateProcessorConfig.isFileValid());
}

@Test
void test_is_file_valid_with_invalid_file() throws NoSuchFieldException, IllegalAccessException, IOException {
String fileContent = "mappings:";
Files.write(testMappingsFile.toPath(), fileContent.getBytes());

String filePath = testMappingsFile.getAbsolutePath();
setField(TranslateProcessorConfig.class, translateProcessorConfig, "filePath", filePath);

assertFalse(translateProcessorConfig.isFileValid());
}

@Test
void test_is_file_invalid_with_invalid_file_path() throws NoSuchFieldException, IllegalAccessException {
String filePath = "/invalid/file/nofile.yaml";
setField(TranslateProcessorConfig.class, translateProcessorConfig, "filePath", filePath);

assertFalse(translateProcessorConfig.isFileValid());
}
}

}
Loading
Loading