Skip to content

Commit

Permalink
Define multiple keys for type conversion (opensearch-project#2934)
Browse files Browse the repository at this point in the history
* feat: add include_key options to KeyValueProcessor

Signed-off-by: Haidong <[email protected]>

---------

Signed-off-by: Haidong <[email protected]>
Co-authored-by: Haidong <[email protected]>
Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
  • Loading branch information
2 people authored and Marcos Gonzalez Mayedo committed Jul 25, 2023
1 parent 17ee7f3 commit 6f934b1
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 15 deletions.
5 changes: 4 additions & 1 deletion data-prepper-plugins/mutate-event-processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,11 @@ and the type conversion processor will change it to the following output, where
{"message": "10.10.10.10 [19/Feb/2015:15:50:36 -0500] 200", "clientip":"10.10.10.10", "timestamp": "19/Feb/2015:15:50:36 -0500", "response_status": 200}
```
### Configuration
* `key` - (required) - keys whose value needs to be converted to a different type
* `key` - keys whose value needs to be converted to a different type. Required if `keys` option is not defined.
* `keys` - list of keys whose value needs to be converted to a different type. Required if `key` option is not defined.
* `type` - target type for the value of the key. Possible values are `integer`, `double`, `string`, and `boolean`. Default is `integer`.
* `null_values` - treat any value in the null_values list as null.
* Example: `null_values` is `["-"]` and `key` is `key1`. `{"key1": "-", "key2": "value2"}` will parse into `{"key2": "value2"}`

## List-to-map Processor
A processor that converts a list of objects from an event, where each object has a key field, to a map of keys to objects.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
import org.opensearch.dataprepper.typeconverter.TypeConverter;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

@DataPrepperPlugin(name = "convert_entry_type", pluginType = Processor.class, pluginConfigurationType = ConvertEntryTypeProcessorConfig.class)
public class ConvertEntryTypeProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private final String key;
private final List<String> convertEntryKeys;
private final TypeConverter converter;
private final String convertWhen;
private final List<String> nullValues;
Expand All @@ -33,7 +34,7 @@ public ConvertEntryTypeProcessor(final PluginMetrics pluginMetrics,
final ConvertEntryTypeProcessorConfig convertEntryTypeProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.key = convertEntryTypeProcessorConfig.getKey();
this.convertEntryKeys = getKeysToConvert(convertEntryTypeProcessorConfig);
this.converter = convertEntryTypeProcessorConfig.getType().getTargetConverter();
this.convertWhen = convertEntryTypeProcessorConfig.getConvertWhen();
this.nullValues = convertEntryTypeProcessorConfig.getNullValues()
Expand All @@ -50,11 +51,13 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
continue;
}

Object keyVal = recordEvent.get(key, Object.class);
if (keyVal != null) {
recordEvent.delete(key);
if (!nullValues.contains(keyVal.toString())){
recordEvent.put(key, this.converter.convert(keyVal));
for(final String key : convertEntryKeys) {
Object keyVal = recordEvent.get(key, Object.class);
if (keyVal != null) {
recordEvent.delete(key);
if (!nullValues.contains(keyVal.toString())) {
recordEvent.put(key, this.converter.convert(keyVal));
}
}
}
}
Expand All @@ -73,6 +76,25 @@ public boolean isReadyForShutdown() {
@Override
public void shutdown() {
}

private List<String> getKeysToConvert(final ConvertEntryTypeProcessorConfig convertEntryTypeProcessorConfig) {
final String key = convertEntryTypeProcessorConfig.getKey();
final List<String> keys = convertEntryTypeProcessorConfig.getKeys();
if (key == null && keys == null) {
throw new IllegalArgumentException("key and keys cannot both be null. One must be provided.");
}
if (key != null && keys != null) {
throw new IllegalArgumentException("key and keys cannot both be defined.");
}
if (key != null) {
if (key.isEmpty()) {
throw new IllegalArgumentException("key cannot be empty.");
} else {
return Collections.singletonList(key);
}
}
return keys;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
package org.opensearch.dataprepper.plugins.processor.mutateevent;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;

import java.util.List;
import java.util.Optional;

public class ConvertEntryTypeProcessorConfig {
@JsonProperty("key")
@NotEmpty
private String key;

@JsonProperty("keys")
private List<String> keys;

@JsonProperty("type")
private TargetType type = TargetType.INTEGER;

Expand All @@ -29,6 +30,8 @@ public String getKey() {
return key;
}

public List<String> getKeys() { return keys; }

public TargetType getType() {
return type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand All @@ -51,8 +52,9 @@ static Record<Event> buildRecordWithEvent(final Map<String, Object> data) {

@BeforeEach
private void setup() {
when(mockConfig.getKey()).thenReturn(TEST_KEY);
when(mockConfig.getConvertWhen()).thenReturn(null);
lenient().when(mockConfig.getKey()).thenReturn(TEST_KEY);
lenient().when(mockConfig.getKeys()).thenReturn(null);
lenient().when(mockConfig.getConvertWhen()).thenReturn(null);
}

private Record<Event> getMessage(String message, String key, Object value) {
Expand Down Expand Up @@ -196,4 +198,42 @@ void testNoConversionWhenConvertWhenIsFalse() {
Event event = executeAndGetProcessedEvent(record);
assertThat(event.get(TEST_KEY, Integer.class), equalTo(testValue));
}

@Test
void testMultipleKeysConvertEntryTypeProcessor() {
Integer testValue = 123;
String expectedValue = testValue.toString();
String testKey1 = UUID.randomUUID().toString();
String testKey2 = UUID.randomUUID().toString();
when(mockConfig.getKey()).thenReturn(null);
when(mockConfig.getKeys()).thenReturn(List.of(testKey1, testKey2));
when(mockConfig.getType()).thenReturn(TargetType.fromOptionValue("string"));
final Map<String, Object> testData = new HashMap();
testData.put("message", "testMessage");
testData.put(testKey1, testValue);
testData.put(testKey2, testValue);
Record record = buildRecordWithEvent(testData);
typeConversionProcessor = new ConvertEntryTypeProcessor(pluginMetrics, mockConfig, expressionEvaluator);
Event event = executeAndGetProcessedEvent(record);
assertThat(event.get(testKey1, String.class), equalTo(expectedValue));
assertThat(event.get(testKey2, String.class), equalTo(expectedValue));
}

@Test
void testKeyAndKeysBothNullConvertEntryTypeProcessor() {
when(mockConfig.getKey()).thenReturn(null);
assertThrows(IllegalArgumentException.class, () -> new ConvertEntryTypeProcessor(pluginMetrics, mockConfig, expressionEvaluator));
}

@Test
void testKeyAndKeysBothDefinedConvertEntryTypeProcessor() {
when(mockConfig.getKeys()).thenReturn(Collections.singletonList(TEST_KEY));
assertThrows(IllegalArgumentException.class, () -> new ConvertEntryTypeProcessor(pluginMetrics, mockConfig, expressionEvaluator));
}

@Test
void testEmptyKeyConvertEntryTypeProcessor() {
when(mockConfig.getKey()).thenReturn("");
assertThrows(IllegalArgumentException.class, () -> new ConvertEntryTypeProcessor(pluginMetrics, mockConfig, expressionEvaluator));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand All @@ -42,9 +43,10 @@ public class ConvertEntryTypeProcessor_NullValueTests {

@BeforeEach
private void setup() {
when(mockConfig.getKey()).thenReturn(TEST_KEY);
when(mockConfig.getType()).thenReturn(TargetType.fromOptionValue("integer"));
when(mockConfig.getConvertWhen()).thenReturn(null);
lenient().when(mockConfig.getKey()).thenReturn(TEST_KEY);
lenient().when(mockConfig.getKeys()).thenReturn(null);
lenient().when(mockConfig.getType()).thenReturn(TargetType.fromOptionValue("integer"));
lenient().when(mockConfig.getConvertWhen()).thenReturn(null);
}

private Event executeAndGetProcessedEvent(final Object testValue) {
Expand Down Expand Up @@ -117,4 +119,23 @@ void testMultipleElementNullValues() {
assertThat(event.get(TEST_KEY, Integer.class), equalTo(testNumber));
}

@Test
void testMultipleKeysNullValues() {
String testValue = "-";
String testKey1 = UUID.randomUUID().toString();
String testKey2 = UUID.randomUUID().toString();
when(mockConfig.getKey()).thenReturn(null);
when(mockConfig.getKeys()).thenReturn(List.of(testKey1, testKey2));
when(mockConfig.getNullValues()).thenReturn(Optional.of(List.of("-")));
final Map<String, Object> testData = new HashMap();
testData.put("message", "testMessage");
testData.put(testKey1, testValue);
testData.put(testKey2, testValue);
Record record = buildRecordWithEvent(testData);
nullValuesProcessor = new ConvertEntryTypeProcessor(pluginMetrics, mockConfig, expressionEvaluator);
Event event = executeAndGetProcessedEvent(record);
assertThat(event.get(testKey1, String.class), nullValue());
assertThat(event.get(testKey2, String.class), nullValue());
}

}

0 comments on commit 6f934b1

Please sign in to comment.