Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define multiple keys for type conversion #2934

Merged
merged 10 commits into from
Jul 5, 2023
5 changes: 4 additions & 1 deletion data-prepper-plugins/mutate-event-processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,11 @@ and the type conversion processor will change it to the following output, where
{"message": "10.10.10.10 [19/Feb/2015:15:50:36 -0500] 200", "clientip":"10.10.10.10", "timestamp": "19/Feb/2015:15:50:36 -0500", "response_status": 200}
```
### Configuration
* `key` - (required) - keys whose value needs to be converted to a different type
* `key` - keys whose value needs to be converted to a different type. Required if `keys` option is not defined.
* `keys` - list of keys whose value needs to be converted to a different type. Required if `key` option is not defined.
* `type` - target type for the value of the key. Possible values are `integer`, `double`, `string`, and `boolean`. Default is `integer`.
* `null_values` - treat any value in the null_values list as null.
* Example: `null_values` is `["-"]` and `key` is `key1`. `{"key1": "-", "key2": "value2"}` will parse into `{"key2": "value2"}`

## List-to-map Processor
A processor that converts a list of objects from an event, where each object has a key field, to a map of keys to objects.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
import org.opensearch.dataprepper.typeconverter.TypeConverter;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

@DataPrepperPlugin(name = "convert_entry_type", pluginType = Processor.class, pluginConfigurationType = ConvertEntryTypeProcessorConfig.class)
public class ConvertEntryTypeProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private final String key;
private final List<String> convertEntryKeys;
private final TypeConverter converter;
private final String convertWhen;
private final List<String> nullValues;
Expand All @@ -33,7 +34,7 @@ public ConvertEntryTypeProcessor(final PluginMetrics pluginMetrics,
final ConvertEntryTypeProcessorConfig convertEntryTypeProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.key = convertEntryTypeProcessorConfig.getKey();
this.convertEntryKeys = getKeysToConvert(convertEntryTypeProcessorConfig);
this.converter = convertEntryTypeProcessorConfig.getType().getTargetConverter();
this.convertWhen = convertEntryTypeProcessorConfig.getConvertWhen();
this.nullValues = convertEntryTypeProcessorConfig.getNullValues()
Expand All @@ -50,11 +51,13 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
continue;
}

Object keyVal = recordEvent.get(key, Object.class);
if (keyVal != null) {
recordEvent.delete(key);
if (!nullValues.contains(keyVal.toString())){
recordEvent.put(key, this.converter.convert(keyVal));
for(final String key : convertEntryKeys) {
Object keyVal = recordEvent.get(key, Object.class);
if (keyVal != null) {
recordEvent.delete(key);
if (!nullValues.contains(keyVal.toString())) {
recordEvent.put(key, this.converter.convert(keyVal));
}
}
}
}
Expand All @@ -73,6 +76,25 @@ public boolean isReadyForShutdown() {
@Override
public void shutdown() {
}

private List<String> getKeysToConvert(final ConvertEntryTypeProcessorConfig convertEntryTypeProcessorConfig) {
final String key = convertEntryTypeProcessorConfig.getKey();
final List<String> keys = convertEntryTypeProcessorConfig.getKeys();
if (key == null && keys == null) {
throw new IllegalArgumentException("key and keys cannot both be null. One must be provided.");
}
if (key != null && keys != null) {
throw new IllegalArgumentException("key and keys cannot both be defined.");
}
if (key != null) {
if (key.isEmpty()) {
throw new IllegalArgumentException("key cannot be empty.");
} else {
return Collections.singletonList(key);
}
}
return keys;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
package org.opensearch.dataprepper.plugins.processor.mutateevent;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;

import java.util.List;
import java.util.Optional;

public class ConvertEntryTypeProcessorConfig {
@JsonProperty("key")
@NotEmpty
private String key;

@JsonProperty("keys")
private List<String> keys;

@JsonProperty("type")
private TargetType type = TargetType.INTEGER;

Expand All @@ -29,6 +30,8 @@ public String getKey() {
return key;
}

public List<String> getKeys() { return keys; }

public TargetType getType() {
return type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand All @@ -51,8 +52,9 @@ static Record<Event> buildRecordWithEvent(final Map<String, Object> data) {

@BeforeEach
private void setup() {
when(mockConfig.getKey()).thenReturn(TEST_KEY);
when(mockConfig.getConvertWhen()).thenReturn(null);
lenient().when(mockConfig.getKey()).thenReturn(TEST_KEY);
lenient().when(mockConfig.getKeys()).thenReturn(null);
lenient().when(mockConfig.getConvertWhen()).thenReturn(null);
}

private Record<Event> getMessage(String message, String key, Object value) {
Expand Down Expand Up @@ -196,4 +198,42 @@ void testNoConversionWhenConvertWhenIsFalse() {
Event event = executeAndGetProcessedEvent(record);
assertThat(event.get(TEST_KEY, Integer.class), equalTo(testValue));
}

@Test
void testMultipleKeysConvertEntryTypeProcessor() {
Integer testValue = 123;
String expectedValue = testValue.toString();
String testKey1 = UUID.randomUUID().toString();
String testKey2 = UUID.randomUUID().toString();
when(mockConfig.getKey()).thenReturn(null);
when(mockConfig.getKeys()).thenReturn(List.of(testKey1, testKey2));
when(mockConfig.getType()).thenReturn(TargetType.fromOptionValue("string"));
final Map<String, Object> testData = new HashMap();
testData.put("message", "testMessage");
testData.put(testKey1, testValue);
testData.put(testKey2, testValue);
Record record = buildRecordWithEvent(testData);
typeConversionProcessor = new ConvertEntryTypeProcessor(pluginMetrics, mockConfig, expressionEvaluator);
Event event = executeAndGetProcessedEvent(record);
assertThat(event.get(testKey1, String.class), equalTo(expectedValue));
assertThat(event.get(testKey2, String.class), equalTo(expectedValue));
}

@Test
void testKeyAndKeysBothNullConvertEntryTypeProcessor() {
when(mockConfig.getKey()).thenReturn(null);
assertThrows(IllegalArgumentException.class, () -> new ConvertEntryTypeProcessor(pluginMetrics, mockConfig, expressionEvaluator));
}

@Test
void testKeyAndKeysBothDefinedConvertEntryTypeProcessor() {
when(mockConfig.getKeys()).thenReturn(Collections.singletonList(TEST_KEY));
assertThrows(IllegalArgumentException.class, () -> new ConvertEntryTypeProcessor(pluginMetrics, mockConfig, expressionEvaluator));
}

@Test
void testEmptyKeyConvertEntryTypeProcessor() {
when(mockConfig.getKey()).thenReturn("");
assertThrows(IllegalArgumentException.class, () -> new ConvertEntryTypeProcessor(pluginMetrics, mockConfig, expressionEvaluator));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand All @@ -42,9 +43,10 @@ public class ConvertEntryTypeProcessor_NullValueTests {

@BeforeEach
private void setup() {
when(mockConfig.getKey()).thenReturn(TEST_KEY);
when(mockConfig.getType()).thenReturn(TargetType.fromOptionValue("integer"));
when(mockConfig.getConvertWhen()).thenReturn(null);
lenient().when(mockConfig.getKey()).thenReturn(TEST_KEY);
lenient().when(mockConfig.getKeys()).thenReturn(null);
lenient().when(mockConfig.getType()).thenReturn(TargetType.fromOptionValue("integer"));
lenient().when(mockConfig.getConvertWhen()).thenReturn(null);
}

private Event executeAndGetProcessedEvent(final Object testValue) {
Expand Down Expand Up @@ -117,4 +119,23 @@ void testMultipleElementNullValues() {
assertThat(event.get(TEST_KEY, Integer.class), equalTo(testNumber));
}

@Test
void testMultipleKeysNullValues() {
String testValue = "-";
String testKey1 = UUID.randomUUID().toString();
String testKey2 = UUID.randomUUID().toString();
when(mockConfig.getKey()).thenReturn(null);
when(mockConfig.getKeys()).thenReturn(List.of(testKey1, testKey2));
when(mockConfig.getNullValues()).thenReturn(Optional.of(List.of("-")));
final Map<String, Object> testData = new HashMap();
testData.put("message", "testMessage");
testData.put(testKey1, testValue);
testData.put(testKey2, testValue);
Record record = buildRecordWithEvent(testData);
nullValuesProcessor = new ConvertEntryTypeProcessor(pluginMetrics, mockConfig, expressionEvaluator);
Event event = executeAndGetProcessedEvent(record);
assertThat(event.get(testKey1, String.class), nullValue());
assertThat(event.get(testKey2, String.class), nullValue());
}

}
Loading