-
Notifications
You must be signed in to change notification settings - Fork 190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Translate Processor - Added file_path functionality for local files #3034
Changes from all commits
2a51a43
f51293f
dfea97d
6df2469
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package org.opensearch.dataprepper.plugins.processor.translate; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import jakarta.validation.Valid; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
|
||
public class FilePathParser { | ||
@JsonProperty("mappings") | ||
@Valid | ||
private List<MappingsParameterConfig> fileMappingConfigs; | ||
|
||
public Optional<List<MappingsParameterConfig>> getCombinedMappings(List<MappingsParameterConfig> mappingConfigs) { | ||
if(Objects.isNull(mappingConfigs) || mappingConfigs.isEmpty()){ | ||
return Optional.ofNullable(fileMappingConfigs); | ||
} | ||
try{ | ||
for (MappingsParameterConfig fileMappingConfig : fileMappingConfigs) { | ||
boolean isDuplicateSource = false; | ||
for (MappingsParameterConfig mappingConfig : mappingConfigs) { | ||
if (mappingConfig.getSource().equals(fileMappingConfig.getSource())) { | ||
isDuplicateSource = true; | ||
combineTargets(fileMappingConfig, mappingConfig); | ||
} | ||
} | ||
if (!isDuplicateSource) { | ||
mappingConfigs.add(fileMappingConfig); | ||
} | ||
} | ||
return Optional.of(mappingConfigs); | ||
} catch (Exception ex){ | ||
return Optional.empty(); | ||
} | ||
} | ||
|
||
private void combineTargets(MappingsParameterConfig filePathMapping, MappingsParameterConfig mappingConfig) { | ||
if(Objects.isNull(mappingConfig)){ | ||
return; | ||
} | ||
List<TargetsParameterConfig> fileTargetConfigs = filePathMapping.getTargetsParameterConfigs(); | ||
List<TargetsParameterConfig> mappingsTargetConfigs = mappingConfig.getTargetsParameterConfigs(); | ||
List<TargetsParameterConfig> combinedTargetConfigs = new ArrayList<>(mappingsTargetConfigs); | ||
|
||
for (TargetsParameterConfig fileTargetConfig : fileTargetConfigs) { | ||
if (!isTargetPresent(fileTargetConfig, combinedTargetConfigs)) { | ||
combinedTargetConfigs.add(fileTargetConfig); | ||
} | ||
} | ||
mappingConfig.setTargetsParameterConfigs(combinedTargetConfigs); | ||
} | ||
|
||
private boolean isTargetPresent(TargetsParameterConfig fileTargetConfig, List<TargetsParameterConfig> combinedTargetConfigs){ | ||
String fileTarget = fileTargetConfig.getTarget(); | ||
return combinedTargetConfigs.stream().anyMatch(targetConfig -> fileTarget.equals(targetConfig.getTarget())); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,18 +44,17 @@ public class TranslateProcessor extends AbstractProcessor<Record<Event>, Record< | |
public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorConfig translateProcessorConfig, final ExpressionEvaluator expressionEvaluator) { | ||
super(pluginMetrics); | ||
this.expressionEvaluator = expressionEvaluator; | ||
mappingsConfig = translateProcessorConfig.getMappingsParameterConfigs(); | ||
mappingsConfig.forEach(MappingsParameterConfig::parseMappings); | ||
parseFile(translateProcessorConfig.getFilePath()); | ||
} | ||
|
||
private void parseFile(String filePath){ | ||
//todo | ||
mappingsConfig = translateProcessorConfig.getCombinedParameterConfigs(); | ||
Optional.ofNullable(mappingsConfig) | ||
.ifPresent(configs -> configs.forEach(MappingsParameterConfig::parseMappings)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we going to stop the pipeline if only file_path option is present and the file is invalid? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. If the mappings provided in the |
||
} | ||
|
||
@Override | ||
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) { | ||
for (final Record<Event> record : records) { | ||
if(Objects.isNull(mappingsConfig)){ | ||
continue; | ||
} | ||
final Event recordEvent = record.getData(); | ||
for (MappingsParameterConfig mappingConfig : mappingsConfig) { | ||
try { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,22 +6,34 @@ | |
package org.opensearch.dataprepper.plugins.processor.translate; | ||
|
||
|
||
import com.fasterxml.jackson.annotation.JsonIgnore; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; | ||
import jakarta.validation.Valid; | ||
import jakarta.validation.constraints.AssertTrue; | ||
import jakarta.validation.constraints.NotNull; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.File; | ||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.stream.Stream; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
|
||
|
||
public class TranslateProcessorConfig { | ||
|
||
@JsonProperty("file_path") | ||
private String filePath; | ||
|
||
@NotNull | ||
@JsonProperty("mappings") | ||
private List<MappingsParameterConfig> mappingsParameterConfigs; | ||
@Valid | ||
private List<MappingsParameterConfig> mappingsParameterConfigs = new ArrayList<>(); | ||
|
||
@JsonIgnore | ||
private List<MappingsParameterConfig> combinedParameterConfigs; | ||
|
||
public String getFilePath() { | ||
return filePath; | ||
|
@@ -31,9 +43,40 @@ public List<MappingsParameterConfig> getMappingsParameterConfigs() { | |
return mappingsParameterConfigs; | ||
} | ||
|
||
@AssertTrue(message = "Either mappings or file_path option needs to be configured.") | ||
public List<MappingsParameterConfig> getCombinedParameterConfigs() { | ||
if(Objects.isNull(combinedParameterConfigs)){ | ||
combinedParameterConfigs = mappingsParameterConfigs; | ||
} | ||
return combinedParameterConfigs; | ||
} | ||
|
||
@AssertTrue(message = "Please ensure that at least one of the options, either \"mappings\" or \"file_path\", is properly configured.") | ||
public boolean hasMappings() { | ||
return Stream.of(mappingsParameterConfigs, filePath).filter(n -> n != null).count() != 0; | ||
return (Objects.nonNull(mappingsParameterConfigs) && !mappingsParameterConfigs.isEmpty()) || Objects.nonNull(filePath); | ||
} | ||
|
||
@AssertTrue(message = "\"mappings\" option should not be empty.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It works. But in this particular case, the pipeline should succeed even if only
|
||
public boolean isMappingsValid() { | ||
return Objects.nonNull(mappingsParameterConfigs); | ||
} | ||
|
||
@AssertTrue(message = "The file specified in the \"file_path\" option is not properly configured.") | ||
public boolean isFileValid() { | ||
return Objects.isNull(filePath) || readFileMappings(filePath); | ||
} | ||
|
||
private boolean readFileMappings(String filePath) { | ||
ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); | ||
try { | ||
FilePathParser fileParser = mapper.readValue(new File(filePath), FilePathParser.class); | ||
Optional<List<MappingsParameterConfig>> optionalCombinedConfigs = fileParser.getCombinedMappings(mappingsParameterConfigs); | ||
optionalCombinedConfigs.ifPresent(combinedConfigs -> combinedParameterConfigs = combinedConfigs); | ||
return Optional.ofNullable(combinedParameterConfigs).map(configs -> true).orElse(false); | ||
} catch (IOException ex) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we log this error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will address this in the next PR. |
||
Logger LOG = LoggerFactory.getLogger(TranslateProcessor.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should declare these at the top |
||
LOG.error("Unable to parse the mappings from file", ex); | ||
return false; | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these annotations doing anything? This isn't a part of the user configuration right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeahh.
@JsonProperty : The file provided will consist of option
mappings
, under which translations will be provided. Since I am using a different variable namefileMappingConfigs
in theFilePathPraser.Class
,@JsonProperty
is required to indicate thatmappings
filed should be mapped tofileMappingConfigs
@Valid : The type of
fileMappingConfigs
isList<MappingParameterConfig>
.MappingParameterConfig.Class
consists of validation checks for the options provided under mappings likesource
,targets
. To validate those checks we need to provide the@Valid
annotation.