diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java index fe01b4730c..d772ff21c6 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java @@ -28,8 +28,8 @@ @JsonDeserialize(using = SinkModel.SinkModelDeserializer.class) public class SinkModel extends PluginModel { - SinkModel(final String pluginName, final List routes, final Map pluginSettings) { - this(pluginName, new SinkInternalJsonModel(routes, pluginSettings)); + SinkModel(final String pluginName, final List routes, final String tagsTargetKey, final Map pluginSettings) { + this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, pluginSettings)); } private SinkModel(final String pluginName, final SinkInternalJsonModel sinkInnerModel) { @@ -46,18 +46,30 @@ public Collection getRoutes() { return this.getInternalJsonModel().routes; } + /** + * Gets the tags target key associated with this Sink. + * + * @return The tags target key + * @since 2.4 + */ + public String getTagsTargetKey() { + return this.getInternalJsonModel().tagsTargetKey; + } + public static class SinkModelBuilder { private final PluginModel pluginModel; private final List routes; + private final String tagsTargetKey; private SinkModelBuilder(final PluginModel pluginModel) { this.pluginModel = pluginModel; this.routes = Collections.emptyList(); + this.tagsTargetKey = null; } public SinkModel build() { - return new SinkModel(pluginModel.getPluginName(), routes, pluginModel.getPluginSettings()); + return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, pluginModel.getPluginSettings()); } } @@ -70,21 +82,27 @@ private static class SinkInternalJsonModel extends InternalJsonModel { @JsonProperty("routes") private final List routes; + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty("tags_target_key") + private final String tagsTargetKey; + @JsonCreator - private SinkInternalJsonModel(@JsonProperty("routes") final List routes) { + private SinkInternalJsonModel(@JsonProperty("routes") final List routes, @JsonProperty("tags_target_key") final String tagsTargetKey) { super(); this.routes = routes != null ? routes : new ArrayList<>(); + this.tagsTargetKey = tagsTargetKey; } - private SinkInternalJsonModel(final List routes, final Map pluginSettings) { + private SinkInternalJsonModel(final List routes, final String tagsTargetKey, final Map pluginSettings) { super(pluginSettings); this.routes = routes != null ? routes : new ArrayList<>(); + this.tagsTargetKey = tagsTargetKey; } } static class SinkModelDeserializer extends AbstractPluginModelDeserializer { SinkModelDeserializer() { - super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null)); + super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null)); } } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginFactory.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginFactory.java index 87f0abfc30..aa6c435920 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginFactory.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginFactory.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.model.plugin; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.configuration.PluginSetting; import java.util.List; @@ -27,6 +28,18 @@ public interface PluginFactory { */ T loadPlugin(final Class baseClass, final PluginSetting pluginSetting); + /** + * Loads a new instance of a plugin with SinkContext. + * + * @param baseClass The class type that the plugin is supporting. + * @param pluginSetting The {@link PluginSetting} to configure this plugin + * @param sinkContext The {@link SinkContext} to configure this plugin + * @param The type + * @return A new instance of your plugin, configured + * @since 1.2 + */ + T loadPlugin(final Class baseClass, final PluginSetting pluginSetting, final SinkContext sinkContext); + /** * Loads a specified number of plugin instances. The total number of instances is provided * by the numberOfInstancesFunction. diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java new file mode 100644 index 0000000000..9650411bd8 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.sink; + +import java.util.Collection; + +/** + * Data Prepper Sink Context class. This the class for keeping global + * sink configuration as context so that individual sinks may use them. + */ +public class SinkContext { + private final String tagsTargetKey; + private final Collection routes; + + public SinkContext(final String tagsTargetKey, final Collection routes) { + this.tagsTargetKey = tagsTargetKey; + this.routes = routes; + } + + /** + * returns the target key name for tags if configured for a given sink + * @return tags target key + */ + public String getTagsTargetKey() { + return tagsTargetKey; + } + + /** + * returns routes if configured for a given sink + * @return routes + */ + public Collection getRoutes() { + return routes; + } +} + diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModelTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModelTest.java index 23dd1aa301..162b7c8de3 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModelTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModelTest.java @@ -50,7 +50,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an final PluginModel source = new PluginModel("testSource", (Map) null); final List processors = Collections.singletonList(new PluginModel("testProcessor", (Map) null)); - final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null)); + final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, null)); final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50); final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel)); @@ -72,7 +72,7 @@ void testSerializing_PipelinesDataFlowModel_with_Version() throws JsonProcessing final DataPrepperVersion version = DataPrepperVersion.parse("2.0"); final PluginModel source = new PluginModel("testSource", (Map) null); final List processors = Collections.singletonList(new PluginModel("testProcessor", (Map) null)); - final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null)); + final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, null)); final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50); final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(version, Collections.singletonMap(pipelineName, pipelineModel)); @@ -93,7 +93,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an final PluginModel source = new PluginModel("testSource", (Map) null); final List preppers = Collections.singletonList(new PluginModel("testPrepper", (Map) null)); - final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.singletonList("my-route"), null)); + final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.singletonList("my-route"), null, null)); final PipelineModel pipelineModel = new PipelineModel(source, null, preppers, Collections.singletonList(new ConditionalRoute("my-route", "/a==b")), sinks, 8, 50); final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel)); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java index 24f6ac6f76..bfe5ad3e73 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java @@ -25,6 +25,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasKey; @@ -74,13 +75,15 @@ void serialize_into_known_SinkModel() throws IOException { final Map pluginSettings = new LinkedHashMap<>(); pluginSettings.put("key1", "value1"); pluginSettings.put("key2", "value2"); - final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), pluginSettings); + final String tagsTargetKey = "tags"; + final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), tagsTargetKey, pluginSettings); final String actualJson = objectMapper.writeValueAsString(sinkModel); final String expectedJson = createStringFromInputStream(this.getClass().getResourceAsStream("sink_plugin.yaml")); assertThat("---\n" + actualJson, equalTo(expectedJson)); + assertThat(sinkModel.getTagsTargetKey(), equalTo(tagsTargetKey)); } @Test @@ -93,7 +96,8 @@ void deserialize_with_any_pluginModel() throws IOException { assertAll( () -> assertThat(sinkModel.getPluginName(), equalTo("customPlugin")), () -> assertThat(sinkModel.getPluginSettings(), notNullValue()), - () -> assertThat(sinkModel.getRoutes(), notNullValue()) + () -> assertThat(sinkModel.getRoutes(), notNullValue()), + () -> assertThat(sinkModel.getTagsTargetKey(), nullValue()) ); assertAll( () -> assertThat(sinkModel.getPluginSettings().size(), equalTo(3)), @@ -123,7 +127,7 @@ void serialize_with_just_pluginModel() throws IOException { pluginSettings.put("key1", "value1"); pluginSettings.put("key2", "value2"); pluginSettings.put("key3", "value3"); - final SinkModel sinkModel = new SinkModel("customPlugin", null, pluginSettings); + final SinkModel sinkModel = new SinkModel("customPlugin", null, null, pluginSettings); final String actualJson = objectMapper.writeValueAsString(sinkModel); @@ -156,10 +160,11 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() { assertThat(actualSinkModel.getPluginSettings(), equalTo(pluginSettings)); assertThat(actualSinkModel.getRoutes(), notNullValue()); assertThat(actualSinkModel.getRoutes(), empty()); + assertThat(actualSinkModel.getTagsTargetKey(), nullValue()); } } private static String createStringFromInputStream(final InputStream inputStream) throws IOException { return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); } -} \ No newline at end of file +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java new file mode 100644 index 0000000000..404c3bbbf5 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.sink; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import org.apache.commons.lang3.RandomStringUtils; + + + +public class SinkContextTest { + private SinkContext sinkContext; + + @Test + public void testSinkContextBasic() { + final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6); + final List testRoutes = Collections.emptyList(); + sinkContext = new SinkContext(testTagsTargetKey, testRoutes); + assertThat(sinkContext.getTagsTargetKey(), equalTo(testTagsTargetKey)); + assertThat(sinkContext.getRoutes(), equalTo(testRoutes)); + + } + +} + diff --git a/data-prepper-api/src/test/resources/org/opensearch/dataprepper/model/configuration/sink_plugin.yaml b/data-prepper-api/src/test/resources/org/opensearch/dataprepper/model/configuration/sink_plugin.yaml index af072de0fa..cccdee7224 100644 --- a/data-prepper-api/src/test/resources/org/opensearch/dataprepper/model/configuration/sink_plugin.yaml +++ b/data-prepper-api/src/test/resources/org/opensearch/dataprepper/model/configuration/sink_plugin.yaml @@ -3,5 +3,6 @@ customSinkPlugin: routes: - "routeA" - "routeB" + tags_target_key: "tags" key1: "value1" key2: "value2" diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineConfigurationValidator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineConfigurationValidator.java index d008101797..bea3c68706 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineConfigurationValidator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineConfigurationValidator.java @@ -8,7 +8,7 @@ import org.apache.commons.collections.CollectionUtils; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.parser.model.PipelineConfiguration; -import org.opensearch.dataprepper.parser.model.RoutedPluginSetting; +import org.opensearch.dataprepper.parser.model.SinkContextPluginSetting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +82,7 @@ private static void visitAndValidate( final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(pipeline); touchedPipelineSet.add(pipeline); //if validation is successful, then there is definitely sink - final List connectedPipelinesSettings = pipelineConfiguration.getSinkPluginSettings(); + final List connectedPipelinesSettings = pipelineConfiguration.getSinkPluginSettings(); //Recursively check connected pipelines for (PluginSetting pluginSetting : connectedPipelinesSettings) { //Further process only if the sink is of pipeline type @@ -159,7 +159,7 @@ private static void validateForOrphans( throw new RuntimeException("Invalid configuration, cannot proceed with ambiguous configuration"); } final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(currentPipelineName); - final List pluginSettings = pipelineConfiguration.getSinkPluginSettings(); + final List pluginSettings = pipelineConfiguration.getSinkPluginSettings(); for (PluginSetting pluginSetting : pluginSettings) { if (PIPELINE_TYPE.equals(pluginSetting.getName()) && pluginSetting.getAttributeFromSettings(PIPELINE_ATTRIBUTE_NAME) != null) { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java index f47e844ea1..6a0a67d0f0 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineParser.java @@ -19,9 +19,10 @@ import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; import org.opensearch.dataprepper.parser.model.PipelineConfiguration; -import org.opensearch.dataprepper.parser.model.RoutedPluginSetting; +import org.opensearch.dataprepper.parser.model.SinkContextPluginSetting; import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration; import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider; import org.opensearch.dataprepper.peerforwarder.PeerForwardingProcessorDecorator; @@ -292,13 +293,13 @@ private Optional getSourceIfPipelineType( return Optional.empty(); } - private DataFlowComponent buildRoutedSinkOrConnector(final RoutedPluginSetting pluginSetting) { - final Sink sink = buildSinkOrConnector(pluginSetting); + private DataFlowComponent buildRoutedSinkOrConnector(final SinkContextPluginSetting pluginSetting) { + final Sink sink = buildSinkOrConnector(pluginSetting, pluginSetting.getSinkContext()); - return new DataFlowComponent<>(sink, pluginSetting.getRoutes()); + return new DataFlowComponent<>(sink, pluginSetting.getSinkContext().getRoutes()); } - private Sink buildSinkOrConnector(final PluginSetting pluginSetting) { + private Sink buildSinkOrConnector(final PluginSetting pluginSetting, final SinkContext sinkContext) { LOG.info("Building [{}] as sink component", pluginSetting.getName()); final Optional pipelineNameOptional = getPipelineNameIfPipelineType(pluginSetting); if (pipelineNameOptional.isPresent()) { //update to ifPresentOrElse when using JDK9 @@ -307,7 +308,7 @@ private Sink buildSinkOrConnector(final PluginSetting pluginSetting) { sourceConnectorMap.put(pipelineName, pipelineConnector); //TODO retrieve from parent Pipeline using name return pipelineConnector; } else { - return pluginFactory.loadPlugin(Sink.class, pluginSetting); + return pluginFactory.loadPlugin(Sink.class, pluginSetting, sinkContext); } } @@ -337,7 +338,7 @@ private void removeConnectedPipelines( sourcePipeline, pipelineConfigurationMap, pipelineMap)); //remove sink connected pipelines - final List sinkPluginSettings = failedPipelineConfiguration.getSinkPluginSettings(); + final List sinkPluginSettings = failedPipelineConfiguration.getSinkPluginSettings(); sinkPluginSettings.forEach(sinkPluginSetting -> { getPipelineNameIfPipelineType(sinkPluginSetting).ifPresent(sinkPipeline -> processRemoveIfRequired( sinkPipeline, pipelineConfigurationMap, pipelineMap)); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineConfiguration.java index bde956ceeb..b35b05bdb5 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineConfiguration.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.configuration.SinkModel; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; import java.util.Collections; @@ -30,7 +31,7 @@ public class PipelineConfiguration { private final PluginSetting sourcePluginSetting; private final PluginSetting bufferPluginSetting; private final List processorPluginSettings; - private final List sinkPluginSettings; + private final List sinkPluginSettings; private final Integer workers; private final Integer readBatchDelay; @@ -58,7 +59,7 @@ public List getProcessorPluginSettings() { return processorPluginSettings; } - public List getSinkPluginSettings() { + public List getSinkPluginSettings() { return sinkPluginSettings; } @@ -104,12 +105,12 @@ private PluginSetting getBufferFromPluginModelOrDefault( return getPluginSettingFromPluginModel(pluginModel); } - private List getSinksFromPluginModel( + private List getSinksFromPluginModel( final List sinkConfigurations) { if (sinkConfigurations == null || sinkConfigurations.isEmpty()) { throw new IllegalArgumentException("Invalid configuration, at least one sink is required"); } - return sinkConfigurations.stream().map(PipelineConfiguration::getRoutedPluginSettingFromSinkModel) + return sinkConfigurations.stream().map(PipelineConfiguration::getSinkContextPluginSettingFromSinkModel) .collect(Collectors.toList()); } @@ -130,11 +131,11 @@ private static PluginSetting getPluginSettingFromPluginModel(final PluginModel p return new PluginSetting(pluginModel.getPluginName(), settingsMap); } - private static RoutedPluginSetting getRoutedPluginSettingFromSinkModel(final SinkModel sinkModel) { + private static SinkContextPluginSetting getSinkContextPluginSettingFromSinkModel(final SinkModel sinkModel) { final Map settingsMap = Optional .ofNullable(sinkModel.getPluginSettings()) .orElseGet(HashMap::new); - return new RoutedPluginSetting(sinkModel.getPluginName(), settingsMap, sinkModel.getRoutes()); + return new SinkContextPluginSetting(sinkModel.getPluginName(), settingsMap, new SinkContext(sinkModel.getTagsTargetKey(), sinkModel.getRoutes())); } private Integer getWorkersFromPipelineModel(final PipelineModel pipelineModel) { @@ -159,4 +160,4 @@ private void validateConfiguration(final Integer configuration, final String com component, configuration)); } } -} \ No newline at end of file +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/RoutedPluginSetting.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/RoutedPluginSetting.java deleted file mode 100644 index 85240bdc4e..0000000000 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/RoutedPluginSetting.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.parser.model; - -import org.opensearch.dataprepper.model.configuration.PluginSetting; - -import java.util.Collection; -import java.util.Map; - -public class RoutedPluginSetting extends PluginSetting { - private final Collection routes; - - public RoutedPluginSetting(final String name, final Map settings, final Collection routes) { - super(name, settings); - this.routes = routes; - } - - public Collection getRoutes() { - return routes; - } -} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/SinkContextPluginSetting.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/SinkContextPluginSetting.java new file mode 100644 index 0000000000..9cb72bf3e8 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/SinkContextPluginSetting.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.parser.model; + +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.sink.SinkContext; + +import java.util.Map; + +public class SinkContextPluginSetting extends PluginSetting { + private final SinkContext sinkContext; + + public SinkContextPluginSetting(final String name, final Map settings, final SinkContext sinkContext) { + super(name, settings); + this.sinkContext = sinkContext; + } + + public SinkContext getSinkContext() { + return sinkContext; + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContext.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContext.java index cb809f75b7..807de63367 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContext.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContext.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugin; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -64,6 +65,9 @@ private ComponentPluginArgumentsContext(final Builder builder) { if (builder.acknowledgementSetManager != null) { typedArgumentsSuppliers.put(AcknowledgementSetManager.class, () -> builder.acknowledgementSetManager); } + if (builder.sinkContext != null) { + typedArgumentsSuppliers.put(SinkContext.class, () -> builder.sinkContext); + } } @Override @@ -114,6 +118,7 @@ static class Builder { private BeanFactory beanFactory; private EventFactory eventFactory; private AcknowledgementSetManager acknowledgementSetManager; + private SinkContext sinkContext; Builder withPluginConfiguration(final Object pluginConfiguration) { this.pluginConfiguration = pluginConfiguration; @@ -140,6 +145,11 @@ Builder withPluginFactory(final PluginFactory pluginFactory) { return this; } + Builder withSinkContext(final SinkContext sinkContext) { + this.sinkContext = sinkContext; + return this; + } + Builder withPipelineDescription(final PipelineDescription pipelineDescription) { this.pipelineDescription = pipelineDescription; return this; diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java index e21acc8ec9..7560b98e10 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugin; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.NoPluginFoundException; @@ -70,7 +71,17 @@ public T loadPlugin(final Class baseClass, final PluginSetting pluginSett final String pluginName = pluginSetting.getName(); final Class pluginClass = getPluginClass(baseClass, pluginName); - final ComponentPluginArgumentsContext constructionContext = getConstructionContext(pluginSetting, pluginClass); + final ComponentPluginArgumentsContext constructionContext = getConstructionContext(pluginSetting, pluginClass, null); + + return pluginCreator.newPluginInstance(pluginClass, constructionContext, pluginName); + } + + @Override + public T loadPlugin(final Class baseClass, final PluginSetting pluginSetting, final SinkContext sinkContext) { + final String pluginName = pluginSetting.getName(); + final Class pluginClass = getPluginClass(baseClass, pluginName); + + final ComponentPluginArgumentsContext constructionContext = getConstructionContext(pluginSetting, pluginClass, sinkContext); return pluginCreator.newPluginInstance(pluginClass, constructionContext, pluginName); } @@ -88,7 +99,7 @@ public List loadPlugins( if(numberOfInstances == null || numberOfInstances < 0) throw new IllegalArgumentException("The numberOfInstances must be provided as a non-negative integer."); - final ComponentPluginArgumentsContext constructionContext = getConstructionContext(pluginSetting, pluginClass); + final ComponentPluginArgumentsContext constructionContext = getConstructionContext(pluginSetting, pluginClass, null); final List plugins = new ArrayList<>(numberOfInstances); for (int i = 0; i < numberOfInstances; i++) { @@ -97,7 +108,7 @@ public List loadPlugins( return plugins; } - private ComponentPluginArgumentsContext getConstructionContext(final PluginSetting pluginSetting, final Class pluginClass) { + private ComponentPluginArgumentsContext getConstructionContext(final PluginSetting pluginSetting, final Class pluginClass, final SinkContext sinkContext) { final DataPrepperPlugin pluginAnnotation = pluginClass.getAnnotation(DataPrepperPlugin.class); final Class pluginConfigurationType = pluginAnnotation.pluginConfigurationType(); @@ -111,6 +122,7 @@ private ComponentPluginArgumentsContext getConstructionContext(final PluginS .withBeanFactory(pluginBeanFactoryProvider.get()) .withEventFactory(eventFactory) .withAcknowledgementSetManager(acknowledgementSetManager) + .withSinkContext(sinkContext) .build(); } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/PipelineConfigurationTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/PipelineConfigurationTests.java index 1ad21de14a..5d611e445d 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/PipelineConfigurationTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/PipelineConfigurationTests.java @@ -58,7 +58,7 @@ void testPipelineConfigurationCreation() { final PluginSetting actualSourcePluginSetting = pipelineConfiguration.getSourcePluginSetting(); final PluginSetting actualBufferPluginSetting = pipelineConfiguration.getBufferPluginSetting(); final List actualProcesserPluginSettings = pipelineConfiguration.getProcessorPluginSettings(); - final List actualSinkPluginSettings = pipelineConfiguration.getSinkPluginSettings(); + final List actualSinkPluginSettings = pipelineConfiguration.getSinkPluginSettings(); comparePluginSettings(actualSourcePluginSetting, TestDataProvider.VALID_PLUGIN_SETTING_1); assertThat(pipelineConfiguration.getBufferPluginSetting(), notNullValue()); @@ -99,7 +99,7 @@ void testOnlySourceAndSink() { final PluginSetting actualSourcePluginSetting = pipelineConfiguration.getSourcePluginSetting(); final PluginSetting actualBufferPluginSetting = pipelineConfiguration.getBufferPluginSetting(); final List actualProcessorPluginSettings = pipelineConfiguration.getProcessorPluginSettings(); - final List actualSinkPluginSettings = pipelineConfiguration.getSinkPluginSettings(); + final List actualSinkPluginSettings = pipelineConfiguration.getSinkPluginSettings(); comparePluginSettings(actualSourcePluginSetting, TestDataProvider.VALID_PLUGIN_SETTING_1); assertThat(pipelineConfiguration.getBufferPluginSetting(), notNullValue()); @@ -221,13 +221,40 @@ void testSinksWithRoutes() { final PipelineConfiguration pipelineConfiguration = new PipelineConfiguration(pipelineModel); - final List actualSinkPluginSettings = pipelineConfiguration.getSinkPluginSettings(); + final List actualSinkPluginSettings = pipelineConfiguration.getSinkPluginSettings(); assertThat(actualSinkPluginSettings.size(), equalTo(2)); comparePluginSettings(actualSinkPluginSettings.get(0), TestDataProvider.VALID_PLUGIN_SETTING_1); comparePluginSettings(actualSinkPluginSettings.get(1), TestDataProvider.VALID_PLUGIN_SETTING_2); - assertThat(actualSinkPluginSettings.get(0).getRoutes(), equalTo(orderedSinkRoutes.get(0))); - assertThat(actualSinkPluginSettings.get(1).getRoutes(), equalTo(orderedSinkRoutes.get(1))); + assertThat(actualSinkPluginSettings.get(0).getSinkContext().getRoutes(), equalTo(orderedSinkRoutes.get(0))); + assertThat(actualSinkPluginSettings.get(1).getSinkContext().getRoutes(), equalTo(orderedSinkRoutes.get(1))); + } + + @Test + void testSinksWithTagsTargetKey() { + final List orderedSinkTagTagets = new ArrayList<>(); + for (final SinkModel sink : sinks) { + final String tagsTargetKey = UUID.randomUUID().toString(); + when(sink.getTagsTargetKey()).thenReturn(tagsTargetKey); + orderedSinkTagTagets.add(tagsTargetKey); + } + + final PipelineModel pipelineModel = mock(PipelineModel.class); + when(pipelineModel.getSource()).thenReturn(source); + when(pipelineModel.getSinks()).thenReturn(sinks); + when(pipelineModel.getProcessors()).thenReturn(null); + when(pipelineModel.getWorkers()).thenReturn(null); + when(pipelineModel.getReadBatchDelay()).thenReturn(null); + + final PipelineConfiguration pipelineConfiguration = new PipelineConfiguration(pipelineModel); + + final List actualSinkPluginSettings = pipelineConfiguration.getSinkPluginSettings(); + + assertThat(actualSinkPluginSettings.size(), equalTo(2)); + comparePluginSettings(actualSinkPluginSettings.get(0), TestDataProvider.VALID_PLUGIN_SETTING_1); + comparePluginSettings(actualSinkPluginSettings.get(1), TestDataProvider.VALID_PLUGIN_SETTING_2); + assertThat(actualSinkPluginSettings.get(0).getSinkContext().getTagsTargetKey(), equalTo(orderedSinkTagTagets.get(0))); + assertThat(actualSinkPluginSettings.get(1).getSinkContext().getTagsTargetKey(), equalTo(orderedSinkTagTagets.get(1))); } private void comparePluginSettings(final PluginSetting actual, final PluginSetting expected) { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/RoutedPluginSettingTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/SinkContextPluginSettingTest.java similarity index 64% rename from data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/RoutedPluginSettingTest.java rename to data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/SinkContextPluginSettingTest.java index b961f234f8..9264abe56c 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/RoutedPluginSettingTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/model/SinkContextPluginSettingTest.java @@ -5,32 +5,32 @@ package org.opensearch.dataprepper.parser.model; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.Collection; import java.util.Map; -import java.util.Set; import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; -class RoutedPluginSettingTest { +class SinkContextPluginSettingTest { private String name; private Map settings; - private Collection routes; + private SinkContext sinkContext; @BeforeEach void setUp() { name = UUID.randomUUID().toString(); settings = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); - routes = Set.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + sinkContext = mock(SinkContext.class); } - private RoutedPluginSetting createObjectUnderTest() { - return new RoutedPluginSetting(name, settings, routes); + private SinkContextPluginSetting createObjectUnderTest() { + return new SinkContextPluginSetting(name, settings, sinkContext); } @Test @@ -44,7 +44,7 @@ void getSettings_returns_settings_from_constructor() { } @Test - void getRoutes_returns_routes_from_constructor() { - assertThat(createObjectUnderTest().getRoutes(), equalTo(routes)); + void getRoutes_returns_sink_context_from_constructor() { + assertThat(createObjectUnderTest().getSinkContext(), equalTo(sinkContext)); } -} \ No newline at end of file +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContextTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContextTest.java index 51506d4c23..ec3dc17644 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContextTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContextTest.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginDefinitionException; import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -98,6 +99,19 @@ void createArguments_with_single_class_using_bean_factory() { equalTo(new Object[] {mock})); } + @Test + void createArguments_with_single_class_using_sink_context() { + final SinkContext sinkContext = mock(SinkContext.class); + + final ComponentPluginArgumentsContext objectUnderTest = new ComponentPluginArgumentsContext.Builder() + .withPluginSetting(pluginSetting) + .withSinkContext(sinkContext) + .build(); + + assertThat(objectUnderTest.createArguments(new Class[] { SinkContext.class }), + equalTo(new Object[] { sinkContext})); + } + @Test void createArguments_given_bean_not_available_with_single_class_using_bean_factory() { doThrow(mock(BeansException.class)).when(beanFactory).getBean((Class) any()); @@ -192,4 +206,4 @@ void createArguments_with_PluginMetrics() { assertThat(arguments, equalTo(new Object[] { pluginSetting, pluginMetrics })); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java index 5cd407c2ff..fc3df248fa 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/FileSink.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,6 +22,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collection; +import java.util.Objects; import java.util.concurrent.locks.ReentrantLock; import static java.lang.String.format; @@ -37,6 +39,7 @@ public class FileSink implements Sink> { private final ReentrantLock lock; private boolean isStopRequested; private boolean initialized; + private final String tagsTargetKey; /** * Mandatory constructor for Data Prepper Component - This constructor is used by Data Prepper @@ -47,11 +50,12 @@ public class FileSink implements Sink> { * @param fileSinkConfig The file sink configuration */ @DataPrepperPluginConstructor - public FileSink(final FileSinkConfig fileSinkConfig) { + public FileSink(final FileSinkConfig fileSinkConfig, final SinkContext sinkContext) { this.outputFilePath = fileSinkConfig.getPath(); isStopRequested = false; initialized = false; lock = new ReentrantLock(true); + tagsTargetKey = Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null; } @Override @@ -64,7 +68,6 @@ public void output(final Collection> records) { for (final Record record : records) { try { checkTypeAndWriteObject(record.getData(), writer); - } catch (final IOException ex) { throw new RuntimeException(format("Encountered exception writing to file %s", outputFilePath), ex); } @@ -84,7 +87,8 @@ public void output(final Collection> records) { // TODO: This function should be removed with the completion of: https://github.com/opensearch-project/data-prepper/issues/546 private void checkTypeAndWriteObject(final Object object, final BufferedWriter writer) throws IOException { if (object instanceof Event) { - writer.write(((Event) object).toJsonString()); + String output = ((Event)object).jsonBuilder().includeTags(tagsTargetKey).toJsonString(); + writer.write(output); writer.newLine(); EventHandle eventHandle = ((Event)object).getEventHandle(); if (eventHandle != null) { diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/StdOutSink.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/StdOutSink.java index 5e50601062..e43c25f658 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/StdOutSink.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/StdOutSink.java @@ -11,11 +11,14 @@ import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.sink.SinkContext; import java.util.Collection; +import java.util.Objects; @DataPrepperPlugin(name = "stdout", pluginType = Sink.class) public class StdOutSink implements Sink> { + private final String tagsTargetKey; /** * Mandatory constructor for Data Prepper Component - This constructor is used by Data Prepper @@ -25,13 +28,17 @@ public class StdOutSink implements Sink> { * * @param pluginSetting instance with metadata information from pipeline pluginSetting file. */ - public StdOutSink(final PluginSetting pluginSetting) { - this(); + public StdOutSink(final PluginSetting pluginSetting, final SinkContext sinkContext) { + this(Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null); } - public StdOutSink() { + public StdOutSink(final String tagsTargetKey) { + this.tagsTargetKey = tagsTargetKey; } + public StdOutSink() { + this.tagsTargetKey = null; + } @Override public void output(final Collection> records) { for (final Record record : records) { @@ -43,7 +50,8 @@ public void output(final Collection> records) { // TODO: This function should be removed with the completion of: https://github.com/opensearch-project/data-prepper/issues/546 private void checkTypeAndPrintObject(final Object object) { if (object instanceof Event) { - System.out.println(((Event) object).toJsonString()); + String output = ((Event)object).jsonBuilder().includeTags(tagsTargetKey).toJsonString(); + System.out.println(output); EventHandle eventHandle = ((Event)object).getEventHandle(); if (eventHandle != null) { eventHandle.release(true); diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/sink/FileSinkTests.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/sink/FileSinkTests.java index b9b2457032..3ed9b99213 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/sink/FileSinkTests.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/sink/FileSinkTests.java @@ -13,6 +13,7 @@ import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.SinkContext; import java.io.BufferedReader; import java.io.File; @@ -37,33 +38,38 @@ class FileSinkTests { private final String TEST_DATA_1 = "data_prepper"; private final String TEST_DATA_2 = "file_sink"; private final String TEST_KEY = "test_key"; + private final String tagStr1 = "tag1"; + private final String tagStr2 = "tag2"; private final Record TEST_STRING_RECORD_1 = new Record<>(TEST_DATA_1); private final Record TEST_STRING_RECORD_2 = new Record<>(TEST_DATA_2); // TODO: remove with the completion of: https://github.com/opensearch-project/data-prepper/issues/546 private final List> TEST_STRING_RECORDS = Arrays.asList(TEST_STRING_RECORD_1, TEST_STRING_RECORD_2); private List> TEST_RECORDS; private FileSinkConfig fileSinkConfig; + private SinkContext sinkContext; @BeforeEach void setUp() throws IOException { fileSinkConfig = mock(FileSinkConfig.class); + sinkContext = mock(SinkContext.class); TEST_OUTPUT_FILE = Files.createTempFile("", "output.txt").toFile(); TEST_RECORDS = new ArrayList<>(); - TEST_RECORDS.add(new Record<>(JacksonEvent - .builder() + JacksonEvent event = JacksonEvent.builder() .withEventType("event") .withData(Map.of(TEST_KEY, TEST_DATA_1)) - .build())); - TEST_RECORDS.add(new Record<>(JacksonEvent - .builder() + .build(); + event.getMetadata().addTags(List.of(tagStr1, tagStr2)); + TEST_RECORDS.add(new Record<>(event)); + event = JacksonEvent.builder() .withEventType("event") .withData(Map.of(TEST_KEY, TEST_DATA_2)) - .build())); + .build(); + TEST_RECORDS.add(new Record<>(event)); } private FileSink createObjectUnderTest() { - return new FileSink(fileSinkConfig); + return new FileSink(fileSinkConfig, sinkContext); } @AfterEach @@ -74,6 +80,7 @@ void tearDown() { @Test void testInvalidFilePath() { when(fileSinkConfig.getPath()).thenReturn(""); + when(sinkContext.getTagsTargetKey()).thenReturn(null); final FileSink objectUnderTest = createObjectUnderTest(); assertThrows(RuntimeException.class, objectUnderTest::initialize); } @@ -88,6 +95,7 @@ void setUp() { // TODO: remove with the completion of: https://github.com/opensearch-project/data-prepper/issues/546 @Test void testValidFilePathStringRecord() throws IOException { + when(sinkContext.getTagsTargetKey()).thenReturn(null); final FileSink fileSink = createObjectUnderTest(); fileSink.initialize(); @@ -100,9 +108,27 @@ void testValidFilePathStringRecord() throws IOException { Assertions.assertTrue(outputData.contains(TEST_DATA_2)); } + @Test + void testValidFilePathStringRecord_EventsWithTags() throws IOException { + when(sinkContext.getTagsTargetKey()).thenReturn("tags"); + final FileSink fileSink = createObjectUnderTest(); + fileSink.initialize(); + + Assertions.assertTrue(fileSink.isReady()); + fileSink.output(TEST_RECORDS); + fileSink.shutdown(); + + final String outputData = readDocFromFile(TEST_OUTPUT_FILE); + Assertions.assertTrue(outputData.contains(TEST_DATA_1)); + Assertions.assertTrue(outputData.contains(tagStr1)); + Assertions.assertTrue(outputData.contains(tagStr2)); + Assertions.assertTrue(outputData.contains(TEST_DATA_2)); + } + // TODO: remove with the completion of: https://github.com/opensearch-project/data-prepper/issues/546 @Test void testValidFilePathCustomTypeRecord() throws IOException { + when(sinkContext.getTagsTargetKey()).thenReturn(null); final FileSink fileSink = createObjectUnderTest(); fileSink.initialize(); Assertions.assertTrue(fileSink.isReady()); @@ -115,6 +141,7 @@ void testValidFilePathCustomTypeRecord() throws IOException { } @Test void testValidFilePath() throws IOException { + when(sinkContext.getTagsTargetKey()).thenReturn(null); final FileSink fileSink = createObjectUnderTest(); fileSink.initialize(); Assertions.assertTrue(fileSink.isReady()); @@ -128,6 +155,7 @@ void testValidFilePath() throws IOException { @Test void testMultipleCallsToOutput() throws IOException { + when(sinkContext.getTagsTargetKey()).thenReturn(null); final FileSink fileSink = createObjectUnderTest(); fileSink.initialize(); Assertions.assertTrue(fileSink.isReady()); @@ -142,6 +170,7 @@ void testMultipleCallsToOutput() throws IOException { @Test void testCallingOutputAfterShutdownDoesNotWrite() throws IOException { + when(sinkContext.getTagsTargetKey()).thenReturn(null); final FileSink fileSink = createObjectUnderTest(); fileSink.initialize(); Assertions.assertTrue(fileSink.isReady()); @@ -157,6 +186,7 @@ void testCallingOutputAfterShutdownDoesNotWrite() throws IOException { @Test void testWithDefaultFile() { + when(sinkContext.getTagsTargetKey()).thenReturn(null); when(fileSinkConfig.getPath()).thenReturn(null); final FileSink objectUnderTest = createObjectUnderTest(); assertThrows(RuntimeException.class, objectUnderTest::initialize); diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/sink/StdOutSinkTests.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/sink/StdOutSinkTests.java index db8bd20b83..a02ed5e30c 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/sink/StdOutSinkTests.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/sink/StdOutSinkTests.java @@ -51,7 +51,7 @@ public void setup() { @Test public void testSinkWithEvents() { - final StdOutSink stdOutSink = new StdOutSink(new PluginSetting(PLUGIN_NAME, new HashMap<>())); + final StdOutSink stdOutSink = new StdOutSink(new PluginSetting(PLUGIN_NAME, new HashMap<>()), null); stdOutSink.output(testRecords); stdOutSink.shutdown(); } @@ -59,7 +59,7 @@ public void testSinkWithEvents() { // TODO: remove with the completion of: https://github.com/opensearch-project/data-prepper/issues/546 @Test public void testSinkWithCustomType() { - final StdOutSink stdOutSink = new StdOutSink(new PluginSetting(PLUGIN_NAME, new HashMap<>())); + final StdOutSink stdOutSink = new StdOutSink(new PluginSetting(PLUGIN_NAME, new HashMap<>()), null); stdOutSink.output(Collections.singletonList(new Record(new TestObject()))); } diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 3905a5e4c0..fd8bf7d82f 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -41,11 +41,14 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction; import org.opensearch.dataprepper.plugins.sink.opensearch.index.AbstractIndexManager; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; +import org.apache.commons.lang3.RandomStringUtils; +import static org.mockito.Mockito.when; import javax.ws.rs.HttpMethod; import java.io.BufferedReader; @@ -108,20 +111,33 @@ public class OpenSearchSinkIT { private RestClient client; private EventHandle eventHandle; + private SinkContext sinkContext; + private String testTagsTargetKey; @Mock private PluginFactory pluginFactory; - @Mock - private AwsCredentialsSupplier awsCredentialsSupplier; + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; - public OpenSearchSink createObjectUnderTest(PluginSetting pluginSetting, boolean doInitialize) { - OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, awsCredentialsSupplier); - if (doInitialize) { - sink.doInitialize(); + public OpenSearchSink createObjectUnderTest(PluginSetting pluginSetting, boolean doInitialize) { + OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, null, awsCredentialsSupplier); + if (doInitialize) { + sink.doInitialize(); + } + return sink; + } + + public OpenSearchSink createObjectUnderTestWithSinkContext(PluginSetting pluginSetting, boolean doInitialize) { + sinkContext = mock(SinkContext.class); + testTagsTargetKey = RandomStringUtils.randomAlphabetic(5); + when(sinkContext.getTagsTargetKey()).thenReturn(testTagsTargetKey); + OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, sinkContext, awsCredentialsSupplier); + if (doInitialize) { + sink.doInitialize(); + } + return sink; } - return sink; - } @BeforeEach public void setup() { @@ -587,6 +603,34 @@ public void testBulkActionCreate() throws IOException, InterruptedException { Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); } + @Test + public void testEventOutputWithTags() throws IOException, InterruptedException { + final Event testEvent = JacksonEvent.builder() + .withData("{\"log\": \"foobar\"}") + .withEventType("event") + .build(); + ((JacksonEvent)testEvent).setEventHandle(eventHandle); + List tagsList = List.of("tag1", "tag2"); + testEvent.getMetadata().addTags(tagsList); + + final List> testRecords = Collections.singletonList(new Record<>(testEvent)); + + final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTestWithSinkContext(pluginSetting, true); + sink.output(testRecords); + + final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); + final List> retSources = getSearchResponseDocSources(expIndexAlias); + final Map expectedContent = new HashMap<>(); + expectedContent.put("log", "foobar"); + expectedContent.put(testTagsTargetKey, tagsList); + + MatcherAssert.assertThat(retSources.size(), equalTo(1)); + MatcherAssert.assertThat(retSources.containsAll(Arrays.asList(expectedContent)), equalTo(true)); + MatcherAssert.assertThat(getDocumentCount(expIndexAlias, "log", "foobar"), equalTo(Integer.valueOf(1))); + sink.shutdown(); + } + @Test public void testEventOutput() throws IOException, InterruptedException { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index b78235966c..a5bee48df9 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -25,6 +25,7 @@ import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException; import org.opensearch.dataprepper.model.failures.DlqObject; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; @@ -58,6 +59,7 @@ import java.nio.file.StandardOpenOption; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; @@ -104,6 +106,7 @@ public class OpenSearchSink extends AbstractSink> { private ObjectMapper objectMapper; private volatile boolean initialized; private PluginSetting pluginSetting; + private final SinkContext sinkContext; private FailedBulkOperationConverter failedBulkOperationConverter; @@ -114,9 +117,11 @@ public class OpenSearchSink extends AbstractSink> { @DataPrepperPluginConstructor public OpenSearchSink(final PluginSetting pluginSetting, final PluginFactory pluginFactory, + final SinkContext sinkContext, final AwsCredentialsSupplier awsCredentialsSupplier) { super(pluginSetting, Integer.MAX_VALUE, INITIALIZE_RETRY_WAIT_TIME_MS); this.awsCredentialsSupplier = awsCredentialsSupplier; + this.sinkContext = sinkContext; bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY); bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS); dynamicIndexDroppedEvents = pluginMetrics.counter(DYNAMIC_INDEX_DROPPED_EVENTS); @@ -302,7 +307,7 @@ private SerializedJson getDocument(final Event event) { String docId = (documentIdField != null) ? event.get(documentIdField, String.class) : null; String routing = (routingField != null) ? event.get(routingField, String.class) : null; - final String document = DocumentBuilder.build(event, documentRootKey); + final String document = DocumentBuilder.build(event, documentRootKey, Objects.nonNull(sinkContext)?sinkContext.getTagsTargetKey():null); return SerializedJson.fromStringAndOptionals(document, docId, routing); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DocumentBuilder.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DocumentBuilder.java index 68b18723fe..81e484904b 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DocumentBuilder.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DocumentBuilder.java @@ -4,7 +4,7 @@ public final class DocumentBuilder { - public static String build(final Event event, final String documentRootKey) { + public static String build(final Event event, final String documentRootKey, final String tagsTargetKey) { if (documentRootKey != null && event.containsKey(documentRootKey)) { final String document = event.getAsJsonString(documentRootKey); if (document == null || !document.startsWith("{")) { @@ -12,6 +12,6 @@ public static String build(final Event event, final String documentRootKey) { } return document; } - return event.toJsonString(); + return event.jsonBuilder().includeTags(tagsTargetKey).toJsonString(); } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DocumentBuilderTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DocumentBuilderTest.java index de46b0ba58..1277303bd0 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DocumentBuilderTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DocumentBuilderTest.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.Stream; @@ -24,6 +25,8 @@ public class DocumentBuilderTest { private String random; private Event event; private String expectedOutput; + private String expectedOutputWithTags; + private final String tagsKey = "tags"; private ObjectMapper objectMapper = new ObjectMapper(); @@ -37,7 +40,9 @@ public void setup() throws JsonProcessingException { .withData(data) .withEventType("TestEvent") .build(); + event.getMetadata().addTags(List.of("tag1")); expectedOutput = objectMapper.writeValueAsString(data); + expectedOutputWithTags = event.jsonBuilder().includeTags(tagsKey).toJsonString(); } @ParameterizedTest @@ -45,16 +50,26 @@ public void setup() throws JsonProcessingException { @ValueSource(strings = {"missingObject", "/"}) public void buildWillReturnFullObject(final String documentRootKey) { - final String doc = DocumentBuilder.build(event, documentRootKey); + final String doc = DocumentBuilder.build(event, documentRootKey, null); assertThat(doc, is(equalTo(expectedOutput))); } + @ParameterizedTest + @NullSource + @ValueSource(strings = {"missingObject", "/"}) + public void buildWillReturnObjectWithTags(final String documentRootKey) { + + final String doc = DocumentBuilder.build(event, documentRootKey, tagsKey); + + assertThat(doc, is(equalTo(expectedOutputWithTags))); + } + @ParameterizedTest @MethodSource("provideSingleItemKeys") public void buildWillReturnSingleObject(final String documentRootKey, final Object expectedResult) { - final String doc = DocumentBuilder.build(event, documentRootKey); + final String doc = DocumentBuilder.build(event, documentRootKey, null); assertThat(doc, is(equalTo(String.format("{\"data\": %s}", expectedResult)))); } diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java index c635650546..cfa9d3657b 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java @@ -134,7 +134,7 @@ void verify_flushed_records_into_s3_bucket() { } private S3SinkService createObjectUnderTest() { - return new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, pluginMetrics); + return new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, null, pluginMetrics); } private int gets3ObjectCount() { @@ -198,4 +198,4 @@ private static Map generateJson() { UUID.randomUUID().toString(), UUID.randomUUID().toString())); return jsonObject; } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java index 1dc6963c23..a4baa538ae 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.plugins.sink.accumulator.BufferFactory; import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions; @@ -26,6 +27,7 @@ import software.amazon.awssdk.services.s3.S3Client; import java.util.Collection; +import java.util.Objects; /** * Implementation class of s3-sink plugin. It is responsible for receive the collection of @@ -40,6 +42,7 @@ public class S3Sink extends AbstractSink> { private volatile boolean sinkInitialized; private final S3SinkService s3SinkService; private final BufferFactory bufferFactory; + private final SinkContext sinkContext; /** * @param pluginSetting dp plugin settings. @@ -50,9 +53,11 @@ public class S3Sink extends AbstractSink> { public S3Sink(final PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory, + final SinkContext sinkContext, final AwsCredentialsSupplier awsCredentialsSupplier) { super(pluginSetting); this.s3SinkConfig = s3SinkConfig; + this.sinkContext = sinkContext; final PluginModel codecConfiguration = s3SinkConfig.getCodec(); final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); @@ -65,7 +70,7 @@ public S3Sink(final PluginSetting pluginSetting, bufferFactory = new InMemoryBufferFactory(); } final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier); - s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, pluginMetrics); + s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, pluginMetrics); } @Override @@ -105,4 +110,4 @@ public void doOutput(final Collection> records) { } s3SinkService.output(records); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java index 5b627faa1b..34c49a9b25 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java @@ -57,6 +57,7 @@ public class S3SinkService { private final Counter numberOfRecordsSuccessCounter; private final Counter numberOfRecordsFailedCounter; private final DistributionSummary s3ObjectSizeSummary; + private final String tagsTargetKey; /** * @param s3SinkConfig s3 sink related configuration. @@ -66,11 +67,12 @@ public class S3SinkService { * @param pluginMetrics metrics. */ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory bufferFactory, - final Codec codec, final S3Client s3Client, final PluginMetrics pluginMetrics) { + final Codec codec, final S3Client s3Client, final String tagsTargetKey, final PluginMetrics pluginMetrics) { this.s3SinkConfig = s3SinkConfig; this.bufferFactory = bufferFactory; this.codec = codec; this.s3Client = s3Client; + this.tagsTargetKey = tagsTargetKey; reentrantLock = new ReentrantLock(); bufferedEventHandles = new LinkedList<>(); @@ -102,7 +104,7 @@ void output(Collection> records) { final Event event = record.getData(); final String encodedEvent; - encodedEvent = codec.parse(event); + encodedEvent = codec.parse(event, tagsTargetKey); final byte[] encodedBytes = encodedEvent.getBytes(); currentBuffer.writeEvent(encodedBytes); @@ -181,4 +183,4 @@ protected String generateKey() { final String namePattern = ObjectKey.objectFileName(s3SinkConfig); return (!pathPrefix.isEmpty()) ? pathPrefix + namePattern : namePattern; } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java index 676526dbb5..06b104287f 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java @@ -15,8 +15,9 @@ public interface Codec { /** * @param event input data. + * @param tagsTargetKey key name for including tags if not null * @return parse string. * @throws IOException exception. */ - String parse(Event event) throws IOException; -} \ No newline at end of file + String parse(final Event event, final String tagsTargetKey) throws IOException; +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java index c3e2886e7c..78847f3145 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java @@ -19,8 +19,8 @@ public class JsonCodec implements Codec { * Generates a serialized json string of the Event */ @Override - public String parse(Event event) throws IOException { + public String parse(final Event event, final String tagsTargetKey) throws IOException { Objects.requireNonNull(event); - return event.toJsonString(); + return event.jsonBuilder().includeTags(tagsTargetKey).toJsonString(); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java index f89eb9026f..36302133a6 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java @@ -31,6 +31,7 @@ import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; +import org.apache.commons.lang3.RandomStringUtils; import java.io.IOException; import java.time.Duration; @@ -83,11 +84,13 @@ class S3SinkServiceTest { private Counter snapshotSuccessCounter; private DistributionSummary s3ObjectSizeSummary; private Random random; + private String tagsTargetKey; @BeforeEach void setUp() { random = new Random(); + tagsTargetKey = RandomStringUtils.randomAlphabetic(5); s3SinkConfig = mock(S3SinkConfig.class); s3Client = mock(S3Client.class); ThresholdOptions thresholdOptions = mock(ThresholdOptions.class); @@ -132,7 +135,7 @@ void setUp() { } private S3SinkService createObjectUnderTest() { - return new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, pluginMetrics); + return new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, tagsTargetKey, pluginMetrics); } @Test @@ -181,7 +184,7 @@ void test_output_with_threshold_set_as_more_then_zero_event_count() throws IOExc when(bufferFactory.getBuffer()).thenReturn(buffer); when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(5); - when(codec.parse(any())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); + when(codec.parse(any(), anyString())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); s3SinkService.output(generateRandomStringEventRecord()); @@ -202,7 +205,7 @@ void test_output_with_threshold_set_as_zero_event_count() throws IOException { when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(0); when(s3SinkConfig.getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse("2kb")); - when(codec.parse(any())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); + when(codec.parse(any(), anyString())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); s3SinkService.output(generateRandomStringEventRecord()); @@ -218,7 +221,7 @@ void test_output_with_uploadedToS3_success() throws IOException { doNothing().when(buffer).flushToS3(any(S3Client.class), anyString(), any(String.class)); when(bufferFactory.getBuffer()).thenReturn(buffer); - when(codec.parse(any())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); + when(codec.parse(any(), anyString())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); assertThat(s3SinkService, instanceOf(S3SinkService.class)); @@ -236,7 +239,7 @@ void test_output_with_uploadedToS3_success_records_byte_count() throws IOExcepti final long objectSize = random.nextInt(1_000_000) + 10_000; when(buffer.getSize()).thenReturn(objectSize); - when(codec.parse(any())).thenReturn(UUID.randomUUID().toString()); + when(codec.parse(any(), anyString())).thenReturn(UUID.randomUUID().toString()); final S3SinkService s3SinkService = createObjectUnderTest(); s3SinkService.output(generateRandomStringEventRecord()); @@ -247,7 +250,7 @@ void test_output_with_uploadedToS3_success_records_byte_count() throws IOExcepti void test_output_with_uploadedToS3_failed() throws IOException { when(s3SinkConfig.getBucketName()).thenReturn(UUID.randomUUID().toString()); when(s3SinkConfig.getMaxUploadRetries()).thenReturn(3); - when(codec.parse(any())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); + when(codec.parse(any(), anyString())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); assertThat(s3SinkService, instanceOf(S3SinkService.class)); @@ -267,7 +270,7 @@ void test_output_with_uploadedToS3_failure_does_not_record_byte_count() throws I final long objectSize = random.nextInt(1_000_000) + 10_000; when(buffer.getSize()).thenReturn(objectSize); - when(codec.parse(any())).thenReturn(UUID.randomUUID().toString()); + when(codec.parse(any(), anyString())).thenReturn(UUID.randomUUID().toString()); final S3SinkService s3SinkService = createObjectUnderTest(); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); s3SinkService.output(Collections.singletonList(new Record<>(event))); @@ -318,7 +321,7 @@ void output_will_release_all_handles_since_a_flush() throws IOException { final long objectSize = random.nextInt(1_000_000) + 10_000; when(buffer.getSize()).thenReturn(objectSize); - when(codec.parse(any())).thenReturn(UUID.randomUUID().toString()); + when(codec.parse(any(), anyString())).thenReturn(UUID.randomUUID().toString()); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); s3SinkService.output(records); @@ -339,7 +342,7 @@ void output_will_skip_releasing_events_without_EventHandle_objects() throws IOEx final long objectSize = random.nextInt(1_000_000) + 10_000; when(buffer.getSize()).thenReturn(objectSize); - when(codec.parse(any())).thenReturn(UUID.randomUUID().toString()); + when(codec.parse(any(), anyString())).thenReturn(UUID.randomUUID().toString()); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); records.stream() @@ -370,7 +373,7 @@ void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOExce final long objectSize = random.nextInt(1_000_000) + 10_000; when(buffer.getSize()).thenReturn(objectSize); - when(codec.parse(any())).thenReturn(UUID.randomUUID().toString()); + when(codec.parse(any(), anyString())).thenReturn(UUID.randomUUID().toString()); final S3SinkService s3SinkService = createObjectUnderTest(); final List> records = generateEventRecords(1); s3SinkService.output(records); @@ -391,7 +394,7 @@ void output_will_release_only_new_handles_since_a_flush() throws IOException { final long objectSize = random.nextInt(1_000_000) + 10_000; when(buffer.getSize()).thenReturn(objectSize); - when(codec.parse(any())).thenReturn(UUID.randomUUID().toString()); + when(codec.parse(any(), anyString())).thenReturn(UUID.randomUUID().toString()); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); s3SinkService.output(records); @@ -422,7 +425,7 @@ void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws I final long objectSize = random.nextInt(1_000_000) + 10_000; when(buffer.getSize()).thenReturn(objectSize); - when(codec.parse(any())).thenReturn(UUID.randomUUID().toString()); + when(codec.parse(any(), anyString())).thenReturn(UUID.randomUUID().toString()); final S3SinkService s3SinkService = createObjectUnderTest(); final List> records = generateEventRecords(1); s3SinkService.output(records); @@ -467,4 +470,4 @@ private byte[] generateByteArray() { } return bytes; } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkTest.java index 25941e718f..b92da61bc6 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkTest.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions; import org.opensearch.dataprepper.plugins.sink.codec.Codec; @@ -49,11 +50,13 @@ class S3SinkTest { private PluginSetting pluginSetting; private PluginFactory pluginFactory; private AwsCredentialsSupplier awsCredentialsSupplier; + private SinkContext sinkContext; @BeforeEach void setUp() { s3SinkConfig = mock(S3SinkConfig.class); + sinkContext = mock(SinkContext.class); ThresholdOptions thresholdOptions = mock(ThresholdOptions.class); AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); Codec codec = mock(JsonCodec.class); @@ -80,7 +83,7 @@ void setUp() { } private S3Sink createObjectUnderTest() { - return new S3Sink(pluginSetting, s3SinkConfig, pluginFactory, awsCredentialsSupplier); + return new S3Sink(pluginSetting, s3SinkConfig, pluginFactory, sinkContext, awsCredentialsSupplier); } @Test @@ -107,4 +110,4 @@ void test_doOutput_with_empty_records() { Collection> records = new ArrayList<>(); s3Sink.doOutput(records); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodecTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodecTest.java index 685bbd4e32..d2055cb0c8 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodecTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodecTest.java @@ -11,6 +11,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.Test; @@ -29,7 +30,7 @@ void parse_with_events_output_stream_json_codec() throws IOException { String value2 = UUID.randomUUID().toString(); eventData.put("key2", value2); final JacksonEvent event = JacksonLog.builder().withData(eventData).withEventType("LOG").build(); - String output = createObjectUnderTest().parse(event); + String output = createObjectUnderTest().parse(event, null); assertNotNull(output); ObjectMapper objectMapper = new ObjectMapper(); @@ -41,7 +42,32 @@ void parse_with_events_output_stream_json_codec() throws IOException { assertThat(deserializedData.get("key2"), equalTo(value2)); } + @Test + void parse_with_events_output_stream_json_codec_with_tags() throws IOException { + + final Map eventData = new HashMap<>(); + String value1 = UUID.randomUUID().toString(); + eventData.put("key1", value1); + String value2 = UUID.randomUUID().toString(); + eventData.put("key2", value2); + final JacksonEvent event = JacksonLog.builder().withData(eventData).withEventType("LOG").build(); + List tagsList = List.of("tag1"); + event.getMetadata().addTags(tagsList); + String output = createObjectUnderTest().parse(event, "tags"); + assertNotNull(output); + + ObjectMapper objectMapper = new ObjectMapper(); + Map deserializedData = objectMapper.readValue(output, Map.class); + assertThat(deserializedData, notNullValue()); + assertThat(deserializedData.get("key1"), notNullValue()); + assertThat(deserializedData.get("key1"), equalTo(value1)); + assertThat(deserializedData.get("key2"), notNullValue()); + assertThat(deserializedData.get("key2"), equalTo(value2)); + assertThat(deserializedData.get("tags"), notNullValue()); + assertThat(deserializedData.get("tags"), equalTo(tagsList)); + } + private JsonCodec createObjectUnderTest() { return new JsonCodec(); } -} \ No newline at end of file +}