diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java index 39c7a1490f..f089c161c7 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java @@ -6,15 +6,14 @@ package org.opensearch.dataprepper.model.codec; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.sink.SinkContext; import java.io.IOException; import java.io.OutputStream; -import java.util.Map; +import java.util.List; public interface OutputCodec { @@ -25,9 +24,10 @@ public interface OutputCodec { * Implementors should do initial wrapping according to the implementation * * @param outputStream outputStream param for wrapping + * @param sinkContext {@link SinkContext} object * @throws IOException throws IOException when invalid input is received or not able to create wrapping */ - void start(OutputStream outputStream) throws IOException; + void start(OutputStream outputStream, SinkContext sinkContext) throws IOException; /** * this method get called from {@link Sink} to write event in {@link OutputStream} @@ -35,10 +35,9 @@ public interface OutputCodec { * * @param event event Record event * @param outputStream outputStream param to hold the event data - * @param tagsTargetKey to add tags to the record * @throws IOException throws IOException when not able to write data to {@link OutputStream} */ - void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException; + void writeEvent(Event event, OutputStream outputStream) throws IOException; /** * this method get called from {@link Sink} to do final wrapping in {@link OutputStream} @@ -56,10 +55,11 @@ public interface OutputCodec { */ String getExtension(); - default Event addTagsToEvent(Event event, String tagsTargetKey) throws JsonProcessingException { - String eventJsonString = event.jsonBuilder().includeTags(tagsTargetKey).toJsonString(); - Map eventData = objectMapper.readValue(eventJsonString, new TypeReference<>() { - }); - return JacksonLog.builder().withData(eventData).build(); + default String buildJsonString(Event event, String tagsTargetKey, List includeKeys, List excludeKeys) throws JsonProcessingException { + return event.jsonBuilder() + .includeTags(tagsTargetKey) + .includeKeys(includeKeys) + .excludeKeys(excludeKeys) + .toJsonString(); } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 9b5ae72b77..df2389292e 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -541,9 +541,11 @@ public String toJsonString() { jsonString = searchAndFilter(getBaseNode(), "", getIncludeKeys(), RETAIN_ALL); } else if (getExcludeKeys() != null && !getExcludeKeys().isEmpty()) { jsonString = searchAndFilter(getBaseNode(), "", getExcludeKeys(), EXCLUDE_ALL); + } else if (getBaseNode() !=event.getJsonNode()) { + jsonString = event.getAsJsonString(getRootKey()); } else { // Some successors have its own implementation of toJsonString, such as JacksonSpan. - // In such case, the root key will be ignored. + // In such case, it's only used when the root key is not provided. // TODO: Need to check if such behaviour is expected. jsonString = event.toJsonString(); } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java index d9b96167dd..85a8f49ea0 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkContext.java @@ -27,6 +27,10 @@ public SinkContext(String tagsTargetKey, Collection routes, List this.excludeKeys = excludeKeys; } + public SinkContext(String tagsTargetKey) { + this(tagsTargetKey, null, null, null); + } + /** * returns the target key name for tags if configured for a given sink * diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java index 461907d9a4..17ca111e61 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.event.EventType; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.sink.SinkContext; import java.io.IOException; import java.io.OutputStream; @@ -18,7 +19,7 @@ import java.util.Set; import java.util.UUID; -import static org.junit.Assert.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; public class OutputCodecTest { @@ -30,11 +31,11 @@ public void setUp() { public void testWriteMetrics() throws JsonProcessingException { OutputCodec outputCodec = new OutputCodec() { @Override - public void start(OutputStream outputStream) throws IOException { + public void start(OutputStream outputStream, SinkContext sinkContext) throws IOException { } @Override - public void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException { + public void writeEvent(Event event, OutputStream outputStream) throws IOException { } @Override @@ -53,8 +54,8 @@ public String getExtension() { withTags(testTags).build(); Map json = generateJson(); final JacksonEvent event = JacksonLog.builder().withData(json).withEventMetadata(defaultEventMetadata).build(); - Event tagsToEvent = outputCodec.addTagsToEvent(event, "Tag"); - assertNotEquals(event.toJsonString(), tagsToEvent.toJsonString()); + String jsonString = outputCodec.buildJsonString(event, "tag1", null, null); + assertEquals(event.jsonBuilder().includeTags("tag1").toJsonString(), jsonString); } private static Map generateJson() { diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index d49b371ffc..df50799dff 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -651,6 +651,7 @@ void testJsonStringBuilder() { eventMetadata.addTags(List.of("tag1", "tag2")); final String expectedJsonString = "{\"foo\":\"bar\",\"tags\":[\"tag1\",\"tag2\"]}"; assertThat(event.jsonBuilder().includeTags("tags").toJsonString(), equalTo(expectedJsonString)); + assertThat(event.jsonBuilder().rootKey("foo").toJsonString(), equalTo("\"bar\"")); assertThat(event.jsonBuilder().toJsonString(), equalTo(jsonString)); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java index 1c3976c338..cb6fe54e02 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkContextTest.java @@ -32,5 +32,16 @@ public void testSinkContextBasic() { } + @Test + public void testSinkContextWithTagsOnly() { + final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6); + sinkContext = new SinkContext(testTagsTargetKey); + assertThat(sinkContext.getTagsTargetKey(), equalTo(testTagsTargetKey)); + assertThat(sinkContext.getRoutes(), equalTo(null)); + assertThat(sinkContext.getIncludeKeys(), equalTo(null)); + assertThat(sinkContext.getExcludeKeys(), equalTo(null)); + + } + } diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java index 8129594162..0df04a3359 100644 --- a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.sink.SinkContext; import java.io.IOException; import java.io.OutputStream; @@ -26,12 +27,12 @@ public AvroOutputCodec(final AvroOutputCodecConfig config) { } @Override - public void start(final OutputStream outputStream) throws IOException { + public void start(OutputStream outputStream, SinkContext sinkContext) throws IOException { // TODO: do the initial wrapping } @Override - public void writeEvent(final Event event, final OutputStream outputStream,final String tagsTargetKey) throws IOException { + public void writeEvent(Event event, OutputStream outputStream) throws IOException { // TODO: write event data to the outputstream } diff --git a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java index ef3cc98225..95ed9ccb99 100644 --- a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java +++ b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.sink.SinkContext; import java.io.IOException; import java.io.OutputStream; @@ -25,12 +26,12 @@ public CsvOutputCodec(final CsvOutputCodecConfig config) { } @Override - public void start(final OutputStream outputStream) throws IOException { + public void start(final OutputStream outputStream, SinkContext sinkContext) throws IOException { // TODO: do the initial wrapping like get header and delimiter and write to Outputstream } @Override - public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException { + public void writeEvent(final Event event, final OutputStream outputStream) throws IOException { // TODO: validate data according to header and write event data to the outputstream } diff --git a/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java b/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java index 9e07f60e9b..ddeba1ccad 100644 --- a/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java +++ b/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.sink.SinkContext; import java.io.IOException; import java.io.OutputStream; @@ -25,6 +26,8 @@ public class NewlineDelimitedOutputCodec implements OutputCodec { private static final ObjectMapper objectMapper = new ObjectMapper(); private final NewlineDelimitedOutputConfig config; + private SinkContext sinkContext; + @DataPrepperPluginConstructor public NewlineDelimitedOutputCodec(final NewlineDelimitedOutputConfig config) { Objects.requireNonNull(config); @@ -32,20 +35,21 @@ public NewlineDelimitedOutputCodec(final NewlineDelimitedOutputConfig config) { } @Override - public void start(final OutputStream outputStream) throws IOException { + public void start(final OutputStream outputStream, SinkContext sinkContext) throws IOException { Objects.requireNonNull(outputStream); + Objects.requireNonNull(sinkContext); + this.sinkContext = sinkContext; } @Override - public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException { + public void writeEvent(final Event event, final OutputStream outputStream) throws IOException { Objects.requireNonNull(event); Map eventMap; - if (tagsTargetKey != null) { - eventMap = addTagsToEvent(event, tagsTargetKey).toMap(); - } else { - eventMap = event.toMap(); - } - writeToOutputStream(outputStream, eventMap); + + String jsonString = buildJsonString(event, sinkContext.getTagsTargetKey(), sinkContext.getIncludeKeys(), sinkContext.getExcludeKeys()); + byte[] byteArr = jsonString.getBytes(); + outputStream.write(byteArr); + outputStream.write(System.lineSeparator().getBytes()); } @Override @@ -53,23 +57,6 @@ public void complete(final OutputStream outputStream) throws IOException { outputStream.close(); } - private void writeToOutputStream(final OutputStream outputStream, final Object object) throws IOException { - byte[] byteArr = null; - if (object instanceof Map) { - Map map = objectMapper.convertValue(object, Map.class); - for (String key : config.getExcludeKeys()) { - if (map.containsKey(key)) { - map.remove(key); - } - } - String json = objectMapper.writeValueAsString(map); - byteArr = json.getBytes(); - } else { - byteArr = object.toString().getBytes(); - } - outputStream.write(byteArr); - outputStream.write(System.lineSeparator().getBytes()); - } @Override public String getExtension() { diff --git a/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputConfig.java b/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputConfig.java index d3e91267e6..d38cbc6553 100644 --- a/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputConfig.java +++ b/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputConfig.java @@ -5,21 +5,16 @@ package org.opensearch.dataprepper.plugins.codec.newline; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.ArrayList; -import java.util.List; - /** * Configuration class for the newline delimited codec. */ public class NewlineDelimitedOutputConfig { - private static final List DEFAULT_EXCLUDE_KEYS = new ArrayList<>(); - - @JsonProperty("exclude_keys") - private List excludeKeys = DEFAULT_EXCLUDE_KEYS; - - public List getExcludeKeys() { - return excludeKeys; - } +// private static final List DEFAULT_EXCLUDE_KEYS = new ArrayList<>(); +// +// @JsonProperty("exclude_keys") +// private List excludeKeys = DEFAULT_EXCLUDE_KEYS; +// +// public List getExcludeKeys() { +// return excludeKeys; +// } } \ No newline at end of file diff --git a/data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodecTest.java b/data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodecTest.java index b31ebaf6f0..a8d5991dd7 100644 --- a/data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodecTest.java +++ b/data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodecTest.java @@ -11,11 +11,13 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.SinkContext; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,13 +42,14 @@ private NewlineDelimitedOutputCodec createObjectUnderTest() { @ParameterizedTest @ValueSource(ints = {1, 3, 10, 100}) void test_happy_case(final int numberOfRecords) throws IOException { + SinkContext sinkContext = new SinkContext(null, null, Collections.emptyList(), Collections.emptyList()); this.numberOfRecords = numberOfRecords; NewlineDelimitedOutputCodec newlineDelimitedOutputCodec = createObjectUnderTest(); outputStream = new ByteArrayOutputStream(); - newlineDelimitedOutputCodec.start(outputStream); + newlineDelimitedOutputCodec.start(outputStream, sinkContext); for (int index = 0; index < numberOfRecords; index++) { final Event event = (Event) getRecord(index).getData(); - newlineDelimitedOutputCodec.writeEvent(event, outputStream, null); + newlineDelimitedOutputCodec.writeEvent(event, outputStream); } newlineDelimitedOutputCodec.complete(outputStream); byte[] byteArray = outputStream.toByteArray(); diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java index c8be600499..0e3dd75cb7 100644 --- a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.sink.SinkContext; import java.io.IOException; import java.io.OutputStream; @@ -25,7 +26,7 @@ public ParquetOutputCodec(final ParquetOutputCodecConfig config) { @Override - public void start(final OutputStream outputStream) throws IOException { + public void start(final OutputStream outputStream, SinkContext sinkContext) throws IOException { // TODO: do the initial wrapping } @@ -35,7 +36,7 @@ public void complete(final OutputStream outputStream) throws IOException { } @Override - public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException { + public void writeEvent(final Event event, final OutputStream outputStream) throws IOException { // TODO: get the event data and write in output stream } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java index bc68761f07..d9bb1a8543 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.sink.SinkContext; import java.io.IOException; import java.io.OutputStream; @@ -19,12 +20,12 @@ public class JsonOutputCodec implements OutputCodec { @Override - public void start(final OutputStream outputStream) throws IOException { + public void start(final OutputStream outputStream, SinkContext sinkContext) throws IOException { // TODO: do the initial wrapping like start the array } @Override - public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException { + public void writeEvent(final Event event, final OutputStream outputStream) throws IOException { // TODO: get the event data and write event data to the outputstream } diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java index 7468e6bf44..8c6e390ef7 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java @@ -23,6 +23,7 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.codec.newline.NewlineDelimitedOutputCodec; import org.opensearch.dataprepper.plugins.codec.newline.NewlineDelimitedOutputConfig; @@ -42,7 +43,6 @@ import software.amazon.awssdk.services.s3.model.S3Object; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.LinkedHashMap; @@ -132,7 +132,6 @@ void verify_flushed_object_count_into_s3_bucket() { void configureNewLineCodec() { codec = new NewlineDelimitedOutputCodec(newlineDelimitedOutputConfig); - when(newlineDelimitedOutputConfig.getExcludeKeys()).thenReturn(new ArrayList<>()); } @Test @@ -154,7 +153,8 @@ void verify_flushed_records_into_s3_bucketNewLine() { } private S3SinkService createObjectUnderTest() { - return new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, "Tag", pluginMetrics); + SinkContext sinkContext = new SinkContext("Tag"); + return new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, sinkContext, pluginMetrics); } private int gets3ObjectCount() { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java index 828fd2e414..f491154e27 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java @@ -16,8 +16,8 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.AbstractSink; -import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.sink.accumulator.BufferFactory; import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions; import org.opensearch.dataprepper.plugins.sink.accumulator.InMemoryBufferFactory; @@ -27,7 +27,7 @@ import software.amazon.awssdk.services.s3.S3Client; import java.util.Collection; -import java.util.Objects; +import java.util.Collections; /** * Implementation class of s3-sink plugin. It is responsible for receive the collection of @@ -57,7 +57,7 @@ public S3Sink(final PluginSetting pluginSetting, final AwsCredentialsSupplier awsCredentialsSupplier) { super(pluginSetting); this.s3SinkConfig = s3SinkConfig; - this.sinkContext = sinkContext; + this.sinkContext = sinkContext != null ? sinkContext : new SinkContext(null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); final PluginModel codecConfiguration = s3SinkConfig.getCodec(); final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); @@ -70,7 +70,7 @@ public S3Sink(final PluginSetting pluginSetting, bufferFactory = new InMemoryBufferFactory(); } final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier); - s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, pluginMetrics); + s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, sinkContext, pluginMetrics); } @Override diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java index 0ed2e8bb79..03b38630ed 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.accumulator.Buffer; import org.opensearch.dataprepper.plugins.sink.accumulator.BufferFactory; @@ -58,7 +59,7 @@ public class S3SinkService { private final Counter numberOfRecordsSuccessCounter; private final Counter numberOfRecordsFailedCounter; private final DistributionSummary s3ObjectSizeSummary; - private final String tagsTargetKey; + private final SinkContext sinkContext; /** * @param s3SinkConfig s3 sink related configuration. @@ -68,12 +69,12 @@ public class S3SinkService { * @param pluginMetrics metrics. */ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory bufferFactory, - final OutputCodec codec, final S3Client s3Client, final String tagsTargetKey, final PluginMetrics pluginMetrics) { + final OutputCodec codec, final S3Client s3Client, final SinkContext sinkContext, final PluginMetrics pluginMetrics) { this.s3SinkConfig = s3SinkConfig; this.bufferFactory = bufferFactory; this.codec = codec; this.s3Client = s3Client; - this.tagsTargetKey = tagsTargetKey; + this.sinkContext = sinkContext; reentrantLock = new ReentrantLock(); bufferedEventHandles = new LinkedList<>(); @@ -106,11 +107,11 @@ void output(Collection> records) { for (Record record : records) { if(currentBuffer.getEventCount() == 0) { - codec.start(outputStream); + codec.start(outputStream, sinkContext); } final Event event = record.getData(); - codec.writeEvent(event, outputStream, tagsTargetKey); + codec.writeEvent(event, outputStream); int count = currentBuffer.getEventCount() +1; currentBuffer.setEventCount(count); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java index 9c8a4e30ab..e2d6d0c54a 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.accumulator.Buffer; import org.opensearch.dataprepper.plugins.sink.accumulator.BufferFactory; @@ -135,7 +136,8 @@ void setUp() { } private S3SinkService createObjectUnderTest() { - return new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, tagsTargetKey, pluginMetrics); + SinkContext sinkContext = new SinkContext(tagsTargetKey, null, null, null); + return new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, sinkContext, pluginMetrics); } @Test @@ -186,7 +188,7 @@ void test_output_with_threshold_set_as_more_then_zero_event_count() throws IOExc when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(5); final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); s3SinkService.output(generateRandomStringEventRecord()); @@ -209,7 +211,7 @@ void test_output_with_threshold_set_as_zero_event_count() throws IOException { when(s3SinkConfig.getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse("2kb")); final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); s3SinkService.output(generateRandomStringEventRecord()); @@ -227,7 +229,7 @@ void test_output_with_uploadedToS3_success() throws IOException { final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); assertThat(s3SinkService, instanceOf(S3SinkService.class)); @@ -247,7 +249,7 @@ void test_output_with_uploadedToS3_success_records_byte_count() throws IOExcepti final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); s3SinkService.output(generateRandomStringEventRecord()); @@ -260,7 +262,7 @@ void test_output_with_uploadedToS3_failed() throws IOException { when(s3SinkConfig.getMaxUploadRetries()).thenReturn(3); final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); assertThat(s3SinkService, instanceOf(S3SinkService.class)); @@ -283,7 +285,7 @@ void test_output_with_uploadedToS3_failure_does_not_record_byte_count() throws I final S3SinkService s3SinkService = createObjectUnderTest(); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); final OutputStream outputStream = mock(OutputStream.class); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); s3SinkService.output(Collections.singletonList(new Record<>(event))); verify(s3ObjectSizeSummary, never()).record(anyLong()); @@ -303,7 +305,7 @@ void test_retryFlushToS3_positive() throws InterruptedException, IOException { assertNotNull(buffer); OutputStream outputStream = buffer.getOutputStream(); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - codec.writeEvent(event, outputStream, null); + codec.writeEvent(event, outputStream); final String s3Key = UUID.randomUUID().toString(); boolean isUploadedToS3 = s3SinkService.retryFlushToS3(buffer, s3Key); assertTrue(isUploadedToS3); @@ -319,7 +321,7 @@ void test_retryFlushToS3_negative() throws InterruptedException, IOException { assertNotNull(s3SinkService); OutputStream outputStream = buffer.getOutputStream(); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - codec.writeEvent(event, outputStream, null); + codec.writeEvent(event, outputStream); final String s3Key = UUID.randomUUID().toString(); doThrow(AwsServiceException.class).when(buffer).flushToS3(eq(s3Client), anyString(), anyString()); boolean isUploadedToS3 = s3SinkService.retryFlushToS3(buffer, s3Key); @@ -338,7 +340,7 @@ void output_will_release_all_handles_since_a_flush() throws IOException { final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); s3SinkService.output(records); @@ -361,7 +363,7 @@ void output_will_skip_releasing_events_without_EventHandle_objects() throws IOEx final OutputStream outputStream = mock(OutputStream.class); final Event event1 = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event1, outputStream, null); + doNothing().when(codec).writeEvent(event1, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); records.stream() @@ -394,7 +396,7 @@ void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOExce final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final List> records = generateEventRecords(1); s3SinkService.output(records); @@ -417,7 +419,7 @@ void output_will_release_only_new_handles_since_a_flush() throws IOException { final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); s3SinkService.output(records); @@ -450,7 +452,7 @@ void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws I final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final List> records = generateEventRecords(1); s3SinkService.output(records);