Skip to content

Commit

Permalink
Update the parse JSON/XML/ION processors to use EventKey.
Browse files Browse the repository at this point in the history
Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed Aug 16, 2024
1 parent 79db359 commit eccab03
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -30,29 +32,32 @@
public abstract class AbstractParseProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractParseProcessor.class);

private final String source;
private final String destination;
private final EventKey source;
private final EventKey destination;
private final String pointer;
private final String parseWhen;
private final List<String> tagsOnFailure;
private final boolean overwriteIfDestinationExists;
private final boolean deleteSourceRequested;

private final ExpressionEvaluator expressionEvaluator;
private final EventKeyFactory eventKeyFactory;

protected AbstractParseProcessor(PluginMetrics pluginMetrics,
CommonParseConfig commonParseConfig,
ExpressionEvaluator expressionEvaluator) {
protected AbstractParseProcessor(final PluginMetrics pluginMetrics,
final CommonParseConfig commonParseConfig,
final ExpressionEvaluator expressionEvaluator,
final EventKeyFactory eventKeyFactory) {
super(pluginMetrics);

source = commonParseConfig.getSource();
destination = commonParseConfig.getDestination();
source = eventKeyFactory.createEventKey(commonParseConfig.getSource(), EventKeyFactory.EventAction.GET, EventKeyFactory.EventAction.DELETE);
destination = commonParseConfig.getDestination() != null ? eventKeyFactory.createEventKey(commonParseConfig.getDestination(), EventKeyFactory.EventAction.PUT, EventKeyFactory.EventAction.GET) : null;
pointer = commonParseConfig.getPointer();
parseWhen = commonParseConfig.getParseWhen();
tagsOnFailure = commonParseConfig.getTagsOnFailure();
overwriteIfDestinationExists = commonParseConfig.getOverwriteIfDestinationExists();
deleteSourceRequested = commonParseConfig.isDeleteSourceRequested();
this.expressionEvaluator = expressionEvaluator;
this.eventKeyFactory = eventKeyFactory;
}

/**
Expand Down Expand Up @@ -99,7 +104,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
if(deleteSourceRequested) {
event.delete(this.source);
}
} catch (Exception e) {
} catch (final Exception e) {
LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e);
}
}
Expand Down Expand Up @@ -128,7 +133,8 @@ private String getProcessorName() {
private Map<String, Object> parseUsingPointer(final Event event, final Map<String, Object> parsedJson, final String pointer,
final boolean doWriteToRoot) {
final Event temporaryEvent = JacksonEvent.builder().withEventType("event").build();
temporaryEvent.put(source, parsedJson);
final EventKey temporaryPutKey = eventKeyFactory.createEventKey(source.getKey(), EventKeyFactory.EventAction.PUT);
temporaryEvent.put(temporaryPutKey, parsedJson);

final String trimmedPointer = trimPointer(pointer);
final String actualPointer = source + "/" + trimmedPointer;
Expand Down Expand Up @@ -170,15 +176,15 @@ private String normalizePointerStructure(final String pointer) {
return pointer.replace('/','.');
}

private String trimPointer(String pointer) {
private String trimPointer(final String pointer) {
final String trimmedLeadingSlash = pointer.startsWith("/") ? pointer.substring(1) : pointer;
return trimmedLeadingSlash.endsWith("/") ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 1) : trimmedLeadingSlash;
}

private void writeToRoot(final Event event, final Map<String, Object> parsedJson) {
for (Map.Entry<String, Object> entry : parsedJson.entrySet()) {
for (final Map.Entry<String, Object> entry : parsedJson.entrySet()) {
if (overwriteIfDestinationExists || !event.containsKey(entry.getKey())) {
event.put(entry.getKey(), entry.getValue());
event.put(eventKeyFactory.createEventKey(entry.getKey(), EventKeyFactory.EventAction.PUT), entry.getValue());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;
import org.slf4j.Logger;
Expand All @@ -32,8 +33,9 @@ public class ParseIonProcessor extends AbstractParseProcessor {
@DataPrepperPluginConstructor
public ParseIonProcessor(final PluginMetrics pluginMetrics,
final ParseIonProcessorConfig parseIonProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics, parseIonProcessorConfig, expressionEvaluator);
final ExpressionEvaluator expressionEvaluator,
final EventKeyFactory eventKeyFactory) {
super(pluginMetrics, parseIonProcessorConfig, expressionEvaluator, eventKeyFactory);

// Convert Timestamps to ISO-8601 Z strings
objectMapper.registerModule(new IonTimestampConverterModule());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;
import org.slf4j.Logger;
Expand All @@ -32,8 +33,9 @@ public class ParseJsonProcessor extends AbstractParseProcessor {
@DataPrepperPluginConstructor
public ParseJsonProcessor(final PluginMetrics pluginMetrics,
final ParseJsonProcessorConfig parseJsonProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics, parseJsonProcessorConfig, expressionEvaluator);
final ExpressionEvaluator expressionEvaluator,
final EventKeyFactory eventKeyFactory) {
super(pluginMetrics, parseJsonProcessorConfig, expressionEvaluator, eventKeyFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;
import org.slf4j.Logger;
Expand All @@ -27,8 +28,9 @@ public class ParseXmlProcessor extends AbstractParseProcessor {
@DataPrepperPluginConstructor
public ParseXmlProcessor(final PluginMetrics pluginMetrics,
final ParseXmlProcessorConfig parseXmlProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics, parseXmlProcessorConfig, expressionEvaluator);
final ExpressionEvaluator expressionEvaluator,
final EventKeyFactory eventKeyFactory) {
super(pluginMetrics, parseXmlProcessorConfig, expressionEvaluator, eventKeyFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void setup() {

@Override
protected AbstractParseProcessor createObjectUnderTest() {
return new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator);
return new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator, testEventKeyFactory);
}

@Test
Expand All @@ -58,7 +58,7 @@ void test_when_using_ion_features_then_processorParsesCorrectly() {
@Test
void test_when_deleteSourceFlagEnabled() {
when(processorConfig.isDeleteSourceRequested()).thenReturn(true);
parseJsonProcessor = new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator);
parseJsonProcessor = createObjectUnderTest();

final String serializedMessage = "{bareKey: 1, symbol: SYMBOL, timestamp: 2023-11-30T21:05:23.383Z, attribute: dollars::100.0 }";
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@

package org.opensearch.dataprepper.plugins.processor.parse.json;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.event.TestEventFactory;
import org.opensearch.dataprepper.event.TestEventKeyFactory;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;
import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig;

Expand Down Expand Up @@ -48,6 +52,8 @@ public class ParseJsonProcessorTest {
protected ExpressionEvaluator expressionEvaluator;

protected AbstractParseProcessor parseJsonProcessor;
private final EventFactory testEventFactory = TestEventFactory.getTestEventFactory();
protected final EventKeyFactory testEventKeyFactory = TestEventKeyFactory.getTestEventFactory();

@BeforeEach
public void setup() {
Expand All @@ -61,7 +67,7 @@ public void setup() {
}

protected AbstractParseProcessor createObjectUnderTest() {
return new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator);
return new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator, testEventKeyFactory);
}

@Test
Expand Down Expand Up @@ -197,7 +203,7 @@ void test_when_nestedJSONArray_then_parsedIntoArrayAndIndicesAccessible() {
@Test
void test_when_deleteSourceFlagEnabled() {
when(processorConfig.isDeleteSourceRequested()).thenReturn(true);
parseJsonProcessor = new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator);
parseJsonProcessor = createObjectUnderTest();

final String key = "key";
final ArrayList<String> value = new ArrayList<>(List.of("Element0","Element1","Element2"));
Expand Down Expand Up @@ -434,10 +440,7 @@ private Record<Event> createMessageEvent(final String message) {
}

private Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
.withEventType("event")
.build());
return new Record<>(testEventFactory.eventBuilder(EventBuilder.class).withData(data).build());
}

private void assertThatKeyEquals(final Event parsedEvent, final String key, final Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.event.TestEventFactory;
import org.opensearch.dataprepper.event.TestEventKeyFactory;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;

Expand Down Expand Up @@ -37,6 +41,8 @@ public class ParseXmlProcessorTest {
private ExpressionEvaluator expressionEvaluator;

private AbstractParseProcessor parseXmlProcessor;
private final EventFactory testEventFactory = TestEventFactory.getTestEventFactory();
private final EventKeyFactory testEventKeyFactory = TestEventKeyFactory.getTestEventFactory();

@BeforeEach
public void setup() {
Expand All @@ -46,7 +52,7 @@ public void setup() {
}

protected AbstractParseProcessor createObjectUnderTest() {
return new ParseXmlProcessor(pluginMetrics, processorConfig, expressionEvaluator);
return new ParseXmlProcessor(pluginMetrics, processorConfig, expressionEvaluator, testEventKeyFactory);
}

@Test
Expand Down Expand Up @@ -104,9 +110,6 @@ private Record<Event> createMessageEvent(final String message) {
}

private Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
.withEventType("event")
.build());
return new Record<>(testEventFactory.eventBuilder(EventBuilder.class).withData(data).build());
}
}

0 comments on commit eccab03

Please sign in to comment.