Skip to content

Commit

Permalink
Add support for writing tags along with events to Sink (#2850)
Browse files Browse the repository at this point in the history
* Updated to pass SinkContext to Sink constructors as suggested in the previous comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed check style errors and renamed RoutedPluginSetting to SinkContextPluginSetting

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed s3-sink integration test

Signed-off-by: Krishna Kondaka <[email protected]>

* Added javadoc for SinkContext

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka committed Jun 27, 2023
1 parent 51722ba commit 37297e7
Show file tree
Hide file tree
Showing 33 changed files with 451 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
@JsonDeserialize(using = SinkModel.SinkModelDeserializer.class)
public class SinkModel extends PluginModel {

SinkModel(final String pluginName, final List<String> routes, final Map<String, Object> pluginSettings) {
this(pluginName, new SinkInternalJsonModel(routes, pluginSettings));
SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final Map<String, Object> pluginSettings) {
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, pluginSettings));
}

private SinkModel(final String pluginName, final SinkInternalJsonModel sinkInnerModel) {
Expand All @@ -46,18 +46,30 @@ public Collection<String> getRoutes() {
return this.<SinkInternalJsonModel>getInternalJsonModel().routes;
}

/**
* Gets the tags target key associated with this Sink.
*
* @return The tags target key
* @since 2.4
*/
public String getTagsTargetKey() {
return this.<SinkInternalJsonModel>getInternalJsonModel().tagsTargetKey;
}

public static class SinkModelBuilder {

private final PluginModel pluginModel;
private final List<String> routes;
private final String tagsTargetKey;

private SinkModelBuilder(final PluginModel pluginModel) {
this.pluginModel = pluginModel;
this.routes = Collections.emptyList();
this.tagsTargetKey = null;
}

public SinkModel build() {
return new SinkModel(pluginModel.getPluginName(), routes, pluginModel.getPluginSettings());
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, pluginModel.getPluginSettings());
}
}

Expand All @@ -70,21 +82,27 @@ private static class SinkInternalJsonModel extends InternalJsonModel {
@JsonProperty("routes")
private final List<String> routes;

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty("tags_target_key")
private final String tagsTargetKey;

@JsonCreator
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes) {
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey) {
super();
this.routes = routes != null ? routes : new ArrayList<>();
this.tagsTargetKey = tagsTargetKey;
}

private SinkInternalJsonModel(final List<String> routes, final Map<String, Object> pluginSettings) {
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final Map<String, Object> pluginSettings) {
super(pluginSettings);
this.routes = routes != null ? routes : new ArrayList<>();
this.tagsTargetKey = tagsTargetKey;
}
}

static class SinkModelDeserializer extends AbstractPluginModelDeserializer<SinkModel, SinkInternalJsonModel> {
SinkModelDeserializer() {
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null));
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.model.plugin;

import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.model.configuration.PluginSetting;

import java.util.List;
Expand All @@ -27,6 +28,18 @@ public interface PluginFactory {
*/
<T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting);

/**
* Loads a new instance of a plugin with SinkContext.
*
* @param baseClass The class type that the plugin is supporting.
* @param pluginSetting The {@link PluginSetting} to configure this plugin
* @param sinkContext The {@link SinkContext} to configure this plugin
* @param <T> The type
* @return A new instance of your plugin, configured
* @since 1.2
*/
<T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting, final SinkContext sinkContext);

/**
* Loads a specified number of plugin instances. The total number of instances is provided
* by the numberOfInstancesFunction.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.sink;

import java.util.Collection;

/**
* Data Prepper Sink Context class. This the class for keeping global
* sink configuration as context so that individual sinks may use them.
*/
public class SinkContext {
private final String tagsTargetKey;
private final Collection<String> routes;

public SinkContext(final String tagsTargetKey, final Collection<String> routes) {
this.tagsTargetKey = tagsTargetKey;
this.routes = routes;
}

/**
* returns the target key name for tags if configured for a given sink
* @return tags target key
*/
public String getTagsTargetKey() {
return tagsTargetKey;
}

/**
* returns routes if configured for a given sink
* @return routes
*/
public Collection<String> getRoutes() {
return routes;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an

final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, null));
final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel));
Expand All @@ -72,7 +72,7 @@ void testSerializing_PipelinesDataFlowModel_with_Version() throws JsonProcessing
final DataPrepperVersion version = DataPrepperVersion.parse("2.0");
final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, null));
final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(version, Collections.singletonMap(pipelineName, pipelineModel));
Expand All @@ -93,7 +93,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an

final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> preppers = Collections.singletonList(new PluginModel("testPrepper", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.singletonList("my-route"), null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.singletonList("my-route"), null, null));
final PipelineModel pipelineModel = new PipelineModel(source, null, preppers, Collections.singletonList(new ConditionalRoute("my-route", "/a==b")), sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasKey;
Expand Down Expand Up @@ -74,13 +75,15 @@ void serialize_into_known_SinkModel() throws IOException {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
pluginSettings.put("key1", "value1");
pluginSettings.put("key2", "value2");
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), pluginSettings);
final String tagsTargetKey = "tags";
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), tagsTargetKey, pluginSettings);

final String actualJson = objectMapper.writeValueAsString(sinkModel);

final String expectedJson = createStringFromInputStream(this.getClass().getResourceAsStream("sink_plugin.yaml"));

assertThat("---\n" + actualJson, equalTo(expectedJson));
assertThat(sinkModel.getTagsTargetKey(), equalTo(tagsTargetKey));
}

@Test
Expand All @@ -93,7 +96,8 @@ void deserialize_with_any_pluginModel() throws IOException {
assertAll(
() -> assertThat(sinkModel.getPluginName(), equalTo("customPlugin")),
() -> assertThat(sinkModel.getPluginSettings(), notNullValue()),
() -> assertThat(sinkModel.getRoutes(), notNullValue())
() -> assertThat(sinkModel.getRoutes(), notNullValue()),
() -> assertThat(sinkModel.getTagsTargetKey(), nullValue())
);
assertAll(
() -> assertThat(sinkModel.getPluginSettings().size(), equalTo(3)),
Expand Down Expand Up @@ -123,7 +127,7 @@ void serialize_with_just_pluginModel() throws IOException {
pluginSettings.put("key1", "value1");
pluginSettings.put("key2", "value2");
pluginSettings.put("key3", "value3");
final SinkModel sinkModel = new SinkModel("customPlugin", null, pluginSettings);
final SinkModel sinkModel = new SinkModel("customPlugin", null, null, pluginSettings);

final String actualJson = objectMapper.writeValueAsString(sinkModel);

Expand Down Expand Up @@ -156,10 +160,11 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() {
assertThat(actualSinkModel.getPluginSettings(), equalTo(pluginSettings));
assertThat(actualSinkModel.getRoutes(), notNullValue());
assertThat(actualSinkModel.getRoutes(), empty());
assertThat(actualSinkModel.getTagsTargetKey(), nullValue());
}
}

private static String createStringFromInputStream(final InputStream inputStream) throws IOException {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.sink;

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import org.apache.commons.lang3.RandomStringUtils;



public class SinkContextTest {
private SinkContext sinkContext;

@Test
public void testSinkContextBasic() {
final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6);
final List<String> testRoutes = Collections.emptyList();
sinkContext = new SinkContext(testTagsTargetKey, testRoutes);
assertThat(sinkContext.getTagsTargetKey(), equalTo(testTagsTargetKey));
assertThat(sinkContext.getRoutes(), equalTo(testRoutes));

}

}

Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ customSinkPlugin:
routes:
- "routeA"
- "routeB"
tags_target_key: "tags"
key1: "value1"
key2: "value2"
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.apache.commons.collections.CollectionUtils;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.parser.model.PipelineConfiguration;
import org.opensearch.dataprepper.parser.model.RoutedPluginSetting;
import org.opensearch.dataprepper.parser.model.SinkContextPluginSetting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -82,7 +82,7 @@ private static void visitAndValidate(
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(pipeline);
touchedPipelineSet.add(pipeline);
//if validation is successful, then there is definitely sink
final List<RoutedPluginSetting> connectedPipelinesSettings = pipelineConfiguration.getSinkPluginSettings();
final List<SinkContextPluginSetting> connectedPipelinesSettings = pipelineConfiguration.getSinkPluginSettings();
//Recursively check connected pipelines
for (PluginSetting pluginSetting : connectedPipelinesSettings) {
//Further process only if the sink is of pipeline type
Expand Down Expand Up @@ -159,7 +159,7 @@ private static void validateForOrphans(
throw new RuntimeException("Invalid configuration, cannot proceed with ambiguous configuration");
}
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(currentPipelineName);
final List<RoutedPluginSetting> pluginSettings = pipelineConfiguration.getSinkPluginSettings();
final List<SinkContextPluginSetting> pluginSettings = pipelineConfiguration.getSinkPluginSettings();
for (PluginSetting pluginSetting : pluginSettings) {
if (PIPELINE_TYPE.equals(pluginSetting.getName()) &&
pluginSetting.getAttributeFromSettings(PIPELINE_ATTRIBUTE_NAME) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.parser.model.PipelineConfiguration;
import org.opensearch.dataprepper.parser.model.RoutedPluginSetting;
import org.opensearch.dataprepper.parser.model.SinkContextPluginSetting;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.peerforwarder.PeerForwardingProcessorDecorator;
Expand Down Expand Up @@ -292,13 +293,13 @@ private Optional<Source> getSourceIfPipelineType(
return Optional.empty();
}

private DataFlowComponent<Sink> buildRoutedSinkOrConnector(final RoutedPluginSetting pluginSetting) {
final Sink sink = buildSinkOrConnector(pluginSetting);
private DataFlowComponent<Sink> buildRoutedSinkOrConnector(final SinkContextPluginSetting pluginSetting) {
final Sink sink = buildSinkOrConnector(pluginSetting, pluginSetting.getSinkContext());

return new DataFlowComponent<>(sink, pluginSetting.getRoutes());
return new DataFlowComponent<>(sink, pluginSetting.getSinkContext().getRoutes());
}

private Sink buildSinkOrConnector(final PluginSetting pluginSetting) {
private Sink buildSinkOrConnector(final PluginSetting pluginSetting, final SinkContext sinkContext) {
LOG.info("Building [{}] as sink component", pluginSetting.getName());
final Optional<String> pipelineNameOptional = getPipelineNameIfPipelineType(pluginSetting);
if (pipelineNameOptional.isPresent()) { //update to ifPresentOrElse when using JDK9
Expand All @@ -307,7 +308,7 @@ private Sink buildSinkOrConnector(final PluginSetting pluginSetting) {
sourceConnectorMap.put(pipelineName, pipelineConnector); //TODO retrieve from parent Pipeline using name
return pipelineConnector;
} else {
return pluginFactory.loadPlugin(Sink.class, pluginSetting);
return pluginFactory.loadPlugin(Sink.class, pluginSetting, sinkContext);
}
}

Expand Down Expand Up @@ -337,7 +338,7 @@ private void removeConnectedPipelines(
sourcePipeline, pipelineConfigurationMap, pipelineMap));

//remove sink connected pipelines
final List<RoutedPluginSetting> sinkPluginSettings = failedPipelineConfiguration.getSinkPluginSettings();
final List<SinkContextPluginSetting> sinkPluginSettings = failedPipelineConfiguration.getSinkPluginSettings();
sinkPluginSettings.forEach(sinkPluginSetting -> {
getPipelineNameIfPipelineType(sinkPluginSetting).ifPresent(sinkPipeline -> processRemoveIfRequired(
sinkPipeline, pipelineConfigurationMap, pipelineMap));
Expand Down
Loading

0 comments on commit 37297e7

Please sign in to comment.