diff --git a/data-prepper-plugins/key-value-processor/README.md b/data-prepper-plugins/key-value-processor/README.md index 0d7ff129fe..505d0dbf91 100644 --- a/data-prepper-plugins/key-value-processor/README.md +++ b/data-prepper-plugins/key-value-processor/README.md @@ -70,6 +70,10 @@ When run, the processor will parse the message into the following output: * Default: `lenient` * Example: `whitespace` is `"lenient"`. `{"key1 = value1"}` will parse into `{"key1 ": " value1"}` * Example: `whitespace` is `"strict"`. `{"key1 = value1"}` will parse into `{"key1": "value1"}` +* `skip_duplicate_values` - A boolean option for removing duplicate key/value pairs. When set to true, only one unique key/value pair will be preserved. + * Default: `false` + * Example: `skip_duplicate_values` is `false`. `{"key1=value1&key1=value1"}` will parse into `{"key1": ["value1", "value1"]}` + * Example: `skip_duplicate_values` is `true`. `{"key1=value1&key1=value1"}` will parse into `{"key1": "value1"}` ## Developer Guide This plugin is compatible with Java 14. See diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java index d0549dc096..035c56f89c 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; +import java.util.regex.Matcher; @DataPrepperPlugin(name = "key_value", pluginType = Processor.class, pluginConfigurationType = KeyValueProcessorConfig.class) public class KeyValueProcessor extends AbstractProcessor, Record> { @@ -109,6 +110,12 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces if (!(validWhitespaceSet.contains(keyValueProcessorConfig.getWhitespace()))) { throw new IllegalArgumentException(String.format("The whitespace value: %s is not a valid option", keyValueProcessorConfig.getWhitespace())); } + + final Pattern duplicateValueBoolCheck = Pattern.compile("true|false", Pattern.CASE_INSENSITIVE); + final Matcher duplicateValueBoolMatch = duplicateValueBoolCheck.matcher(String.valueOf(keyValueProcessorConfig.getSkipDuplicateValues())); + if (!duplicateValueBoolMatch.matches()) { + throw new IllegalArgumentException(String.format("The skip_duplicate_values value: %s is not a valid option", keyValueProcessorConfig.getSkipDuplicateValues())); + } } private String buildRegexFromCharacters(String s) { @@ -220,8 +227,20 @@ private void addKeyValueToMap(final Map parsedMap, final String } if (parsedMap.get(key) instanceof List) { + if (keyValueProcessorConfig.getSkipDuplicateValues()) { + if (((List) parsedMap.get(key)).contains(value)) { + return; + } + } + ((List) parsedMap.get(key)).add(value); } else { + if (keyValueProcessorConfig.getSkipDuplicateValues()) { + if (parsedMap.containsValue(value)) { + return; + } + } + final LinkedList combinedList = new LinkedList<>(); combinedList.add(parsedMap.get(key)); combinedList.add(value); diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java index 1b68d27e3a..fa0cb6eac4 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java @@ -24,6 +24,7 @@ public class KeyValueProcessorConfig { static final String DEFAULT_DELETE_VALUE_REGEX = ""; static final String DEFAULT_TRANSFORM_KEY = ""; static final String DEFAULT_WHITESPACE = "lenient"; + static final boolean DEFAULT_SKIP_DUPLICATE_VALUES = false; @NotEmpty private String source = DEFAULT_SOURCE; @@ -70,6 +71,10 @@ public class KeyValueProcessorConfig { @NotNull private String whitespace = DEFAULT_WHITESPACE; + @JsonProperty("skip_duplicate_values") + @NotNull + private boolean skipDuplicateValues = DEFAULT_SKIP_DUPLICATE_VALUES; + public String getSource() { return source; } @@ -121,4 +126,8 @@ public String getTransformKey() { public String getWhitespace() { return whitespace; } + + public boolean getSkipDuplicateValues() { + return skipDuplicateValues; + } } diff --git a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java index 90a0de9e56..743cd29f76 100644 --- a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java +++ b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java @@ -65,6 +65,7 @@ void setup() { lenient().when(mockConfig.getDeleteValueRegex()).thenReturn(defaultConfig.getDeleteValueRegex()); lenient().when(mockConfig.getTransformKey()).thenReturn(defaultConfig.getTransformKey()); lenient().when(mockConfig.getWhitespace()).thenReturn(defaultConfig.getWhitespace()); + lenient().when(mockConfig.getSkipDuplicateValues()).thenReturn(defaultConfig.getSkipDuplicateValues()); keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); } @@ -421,6 +422,50 @@ void testStrictWhitespaceKvProcessor() { assertThatKeyEquals(parsed_message, "key1", "value1"); } + @Test + void testFalseSkipDuplicateValuesKvProcessor() { + when(mockConfig.getSkipDuplicateValues()).thenReturn(false); + + final Record record = getMessage("key1=value1&key1=value1"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + final ArrayList expectedValue = new ArrayList(); + expectedValue.add("value1"); + expectedValue.add("value1"); + + assertThat(parsed_message.size(), equalTo(1)); + assertThatKeyEquals(parsed_message, "key1", expectedValue); + } + + @Test + void testTrueSkipDuplicateValuesKvProcessor() { + when(mockConfig.getSkipDuplicateValues()).thenReturn(true); + + final Record record = getMessage("key1=value1&key1=value1"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + assertThat(parsed_message.size(), equalTo(1)); + assertThatKeyEquals(parsed_message, "key1", "value1"); + } + + @Test + void testTrueThreeInputsDuplicateValuesKvProcessor() { + when(mockConfig.getSkipDuplicateValues()).thenReturn(true); + + final Record record = getMessage("key1=value1&key1=value2&key1=value1"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + final ArrayList expectedValue = new ArrayList(); + expectedValue.add("value1"); + expectedValue.add("value2"); + + assertThat(parsed_message.size(), equalTo(1)); + assertThatKeyEquals(parsed_message, "key1", expectedValue); + } + @Test void testShutdownIsReady() { assertThat(keyValueProcessor.isReadyForShutdown(), is(true));