Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the parse JSON/XML/ION processors to use EventKey. #4842

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -30,29 +32,32 @@
public abstract class AbstractParseProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractParseProcessor.class);

private final String source;
private final String destination;
private final EventKey source;
private final EventKey destination;
private final String pointer;
private final String parseWhen;
private final List<String> tagsOnFailure;
private final boolean overwriteIfDestinationExists;
private final boolean deleteSourceRequested;

private final ExpressionEvaluator expressionEvaluator;
private final EventKeyFactory eventKeyFactory;

protected AbstractParseProcessor(PluginMetrics pluginMetrics,
CommonParseConfig commonParseConfig,
ExpressionEvaluator expressionEvaluator) {
protected AbstractParseProcessor(final PluginMetrics pluginMetrics,
final CommonParseConfig commonParseConfig,
final ExpressionEvaluator expressionEvaluator,
final EventKeyFactory eventKeyFactory) {
super(pluginMetrics);

source = commonParseConfig.getSource();
destination = commonParseConfig.getDestination();
source = eventKeyFactory.createEventKey(commonParseConfig.getSource(), EventKeyFactory.EventAction.GET, EventKeyFactory.EventAction.DELETE);
destination = commonParseConfig.getDestination() != null ? eventKeyFactory.createEventKey(commonParseConfig.getDestination(), EventKeyFactory.EventAction.PUT, EventKeyFactory.EventAction.GET) : null;
pointer = commonParseConfig.getPointer();
parseWhen = commonParseConfig.getParseWhen();
tagsOnFailure = commonParseConfig.getTagsOnFailure();
overwriteIfDestinationExists = commonParseConfig.getOverwriteIfDestinationExists();
deleteSourceRequested = commonParseConfig.isDeleteSourceRequested();
this.expressionEvaluator = expressionEvaluator;
this.eventKeyFactory = eventKeyFactory;
}

/**
Expand Down Expand Up @@ -99,7 +104,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
if(deleteSourceRequested) {
event.delete(this.source);
}
} catch (Exception e) {
} catch (final Exception e) {
LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e);
}
}
Expand Down Expand Up @@ -128,7 +133,8 @@ private String getProcessorName() {
private Map<String, Object> parseUsingPointer(final Event event, final Map<String, Object> parsedJson, final String pointer,
final boolean doWriteToRoot) {
final Event temporaryEvent = JacksonEvent.builder().withEventType("event").build();
temporaryEvent.put(source, parsedJson);
final EventKey temporaryPutKey = eventKeyFactory.createEventKey(source.getKey(), EventKeyFactory.EventAction.PUT);
temporaryEvent.put(temporaryPutKey, parsedJson);

final String trimmedPointer = trimPointer(pointer);
final String actualPointer = source + "/" + trimmedPointer;
Expand Down Expand Up @@ -170,15 +176,15 @@ private String normalizePointerStructure(final String pointer) {
return pointer.replace('/','.');
}

private String trimPointer(String pointer) {
private String trimPointer(final String pointer) {
final String trimmedLeadingSlash = pointer.startsWith("/") ? pointer.substring(1) : pointer;
return trimmedLeadingSlash.endsWith("/") ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 1) : trimmedLeadingSlash;
}

private void writeToRoot(final Event event, final Map<String, Object> parsedJson) {
for (Map.Entry<String, Object> entry : parsedJson.entrySet()) {
for (final Map.Entry<String, Object> entry : parsedJson.entrySet()) {
if (overwriteIfDestinationExists || !event.containsKey(entry.getKey())) {
event.put(entry.getKey(), entry.getValue());
event.put(eventKeyFactory.createEventKey(entry.getKey(), EventKeyFactory.EventAction.PUT), entry.getValue());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;
import org.slf4j.Logger;
Expand All @@ -32,8 +33,9 @@ public class ParseIonProcessor extends AbstractParseProcessor {
@DataPrepperPluginConstructor
public ParseIonProcessor(final PluginMetrics pluginMetrics,
final ParseIonProcessorConfig parseIonProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics, parseIonProcessorConfig, expressionEvaluator);
final ExpressionEvaluator expressionEvaluator,
final EventKeyFactory eventKeyFactory) {
super(pluginMetrics, parseIonProcessorConfig, expressionEvaluator, eventKeyFactory);

// Convert Timestamps to ISO-8601 Z strings
objectMapper.registerModule(new IonTimestampConverterModule());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;
import org.slf4j.Logger;
Expand All @@ -32,8 +33,9 @@ public class ParseJsonProcessor extends AbstractParseProcessor {
@DataPrepperPluginConstructor
public ParseJsonProcessor(final PluginMetrics pluginMetrics,
final ParseJsonProcessorConfig parseJsonProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics, parseJsonProcessorConfig, expressionEvaluator);
final ExpressionEvaluator expressionEvaluator,
final EventKeyFactory eventKeyFactory) {
super(pluginMetrics, parseJsonProcessorConfig, expressionEvaluator, eventKeyFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;
import org.slf4j.Logger;
Expand All @@ -27,8 +28,9 @@ public class ParseXmlProcessor extends AbstractParseProcessor {
@DataPrepperPluginConstructor
public ParseXmlProcessor(final PluginMetrics pluginMetrics,
final ParseXmlProcessorConfig parseXmlProcessorConfig,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics, parseXmlProcessorConfig, expressionEvaluator);
final ExpressionEvaluator expressionEvaluator,
final EventKeyFactory eventKeyFactory) {
super(pluginMetrics, parseXmlProcessorConfig, expressionEvaluator, eventKeyFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void setup() {

@Override
protected AbstractParseProcessor createObjectUnderTest() {
return new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator);
return new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator, testEventKeyFactory);
}

@Test
Expand All @@ -58,7 +58,7 @@ void test_when_using_ion_features_then_processorParsesCorrectly() {
@Test
void test_when_deleteSourceFlagEnabled() {
when(processorConfig.isDeleteSourceRequested()).thenReturn(true);
parseJsonProcessor = new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator);
parseJsonProcessor = createObjectUnderTest();

final String serializedMessage = "{bareKey: 1, symbol: SYMBOL, timestamp: 2023-11-30T21:05:23.383Z, attribute: dollars::100.0 }";
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@

package org.opensearch.dataprepper.plugins.processor.parse.json;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.event.TestEventFactory;
import org.opensearch.dataprepper.event.TestEventKeyFactory;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;
import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig;

Expand Down Expand Up @@ -48,6 +52,8 @@ public class ParseJsonProcessorTest {
protected ExpressionEvaluator expressionEvaluator;

protected AbstractParseProcessor parseJsonProcessor;
private final EventFactory testEventFactory = TestEventFactory.getTestEventFactory();
protected final EventKeyFactory testEventKeyFactory = TestEventKeyFactory.getTestEventFactory();

@BeforeEach
public void setup() {
Expand All @@ -61,7 +67,7 @@ public void setup() {
}

protected AbstractParseProcessor createObjectUnderTest() {
return new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator);
return new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator, testEventKeyFactory);
}

@Test
Expand Down Expand Up @@ -197,7 +203,7 @@ void test_when_nestedJSONArray_then_parsedIntoArrayAndIndicesAccessible() {
@Test
void test_when_deleteSourceFlagEnabled() {
when(processorConfig.isDeleteSourceRequested()).thenReturn(true);
parseJsonProcessor = new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator);
parseJsonProcessor = createObjectUnderTest();

final String key = "key";
final ArrayList<String> value = new ArrayList<>(List.of("Element0","Element1","Element2"));
Expand Down Expand Up @@ -434,10 +440,7 @@ private Record<Event> createMessageEvent(final String message) {
}

private Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
.withEventType("event")
.build());
return new Record<>(testEventFactory.eventBuilder(EventBuilder.class).withData(data).build());
}

private void assertThatKeyEquals(final Event parsedEvent, final String key, final Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.event.TestEventFactory;
import org.opensearch.dataprepper.event.TestEventKeyFactory;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;

Expand Down Expand Up @@ -37,6 +41,8 @@ public class ParseXmlProcessorTest {
private ExpressionEvaluator expressionEvaluator;

private AbstractParseProcessor parseXmlProcessor;
private final EventFactory testEventFactory = TestEventFactory.getTestEventFactory();
private final EventKeyFactory testEventKeyFactory = TestEventKeyFactory.getTestEventFactory();

@BeforeEach
public void setup() {
Expand All @@ -46,7 +52,7 @@ public void setup() {
}

protected AbstractParseProcessor createObjectUnderTest() {
return new ParseXmlProcessor(pluginMetrics, processorConfig, expressionEvaluator);
return new ParseXmlProcessor(pluginMetrics, processorConfig, expressionEvaluator, testEventKeyFactory);
}

@Test
Expand Down Expand Up @@ -104,9 +110,6 @@ private Record<Event> createMessageEvent(final String message) {
}

private Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
.withEventType("event")
.build());
return new Record<>(testEventFactory.eventBuilder(EventBuilder.class).withData(data).build());
}
}
Loading