diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java index 239c22aa7f..22280b7464 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.sink.Sink; import java.io.IOException; @@ -26,10 +27,10 @@ public interface OutputCodec { * * @param outputStream outputStream param for wrapping * @param event Event to auto-generate schema - * @param tagsTargetKey to add tags to the record to create schema + * @param context Extra Context used in Codec. * @throws IOException throws IOException when invalid input is received or not able to create wrapping */ - void start(OutputStream outputStream, Event event, String tagsTargetKey) throws IOException; + void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException; /** * this method get called from {@link Sink} to write event in {@link OutputStream} @@ -37,10 +38,9 @@ public interface OutputCodec { * * @param event event Record event * @param outputStream outputStream param to hold the event data - * @param tagsTargetKey to add tags to the record * @throws IOException throws IOException when not able to write data to {@link OutputStream} */ - void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException; + void writeEvent(Event event, OutputStream outputStream) throws IOException; /** * this method get called from {@link Sink} to do final wrapping in {@link OutputStream} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java index 133e0beb0e..328a84e586 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import java.util.ArrayList; import java.util.Collection; @@ -122,6 +123,13 @@ private SinkInternalJsonModel(final List routes, final String tagsTarget this.includeKeys = includeKeys != null ? preprocessingKeys(includeKeys) : Collections.emptyList(); this.excludeKeys = excludeKeys != null ? preprocessingKeys(excludeKeys) : Collections.emptyList(); this.tagsTargetKey = tagsTargetKey; + validateConfiguration(); + } + + void validateConfiguration() { + if (!includeKeys.isEmpty() && !excludeKeys.isEmpty()) { + throw new InvalidPluginConfigurationException("include_keys and exclude_keys cannot both exist in the configuration at the same time."); + } } @@ -151,4 +159,4 @@ static class SinkModelDeserializer extends AbstractPluginModelDeserializer new SinkInternalJsonModel(null, null, null, null)); } } -} +} \ No newline at end of file diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/OutputCodecContext.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/OutputCodecContext.java new file mode 100644 index 0000000000..d9cc83c9cb --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/OutputCodecContext.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.sink; + +import java.util.Collections; +import java.util.List; + +/** + * Data Prepper Output Codec Context class. + * The context contains information that are shared and may be used among {@link org.opensearch.dataprepper.model.codec.OutputCodec} + */ +public class OutputCodecContext { + + private final String tagsTargetKey; + + private final List includeKeys; + private final List excludeKeys; + + public OutputCodecContext() { + this(null, Collections.emptyList(), Collections.emptyList()); + } + + + public OutputCodecContext(String tagsTargetKey, List includeKeys, List excludeKeys) { + this.tagsTargetKey = tagsTargetKey; + this.includeKeys = includeKeys; + this.excludeKeys = excludeKeys; + } + + + public static OutputCodecContext fromSinkContext(SinkContext sinkContext) { + if (sinkContext == null) { + return new OutputCodecContext(); + } + return new OutputCodecContext(sinkContext.getTagsTargetKey(), sinkContext.getIncludeKeys(), sinkContext.getExcludeKeys()); + } + + public String getTagsTargetKey() { + return tagsTargetKey; + } + + public List getIncludeKeys() { + return includeKeys; + } + + public List getExcludeKeys() { + return excludeKeys; + } +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java index fc240401a1..4c70ef1b18 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.event.EventType; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import java.io.IOException; import java.io.OutputStream; @@ -30,11 +31,11 @@ public void setUp() { public void testWriteMetrics() throws JsonProcessingException { OutputCodec outputCodec = new OutputCodec() { @Override - public void start(OutputStream outputStream, Event event, String tagsTargetKey) throws IOException { + public void start(OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException { } @Override - public void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException { + public void writeEvent(Event event, OutputStream outputStream) throws IOException { } @Override diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java index 5a09a6c4c1..a5fc6363cb 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/SinkModelTest.java @@ -13,6 +13,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import java.io.IOException; import java.io.InputStream; @@ -21,6 +22,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -32,6 +34,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasKey; import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -90,7 +93,6 @@ void serialize_into_known_SinkModel() throws IOException { } - @Test void deserialize_with_any_pluginModel() throws IOException { final InputStream inputStream = this.getClass().getResourceAsStream("/serialized_with_plugin_settings.yaml"); @@ -144,13 +146,29 @@ void serialize_with_just_pluginModel() throws IOException { @Test void sinkModel_with_include_keys() throws IOException { final Map pluginSettings = new LinkedHashMap<>(); - final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, Arrays.asList("/"), Arrays.asList("bcd", "/abc", "efg/"), pluginSettings); + final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, Arrays.asList("bcd", "/abc", "efg/"), null, pluginSettings); + + assertThat(sinkModel.getExcludeKeys(), equalTo(new ArrayList())); + assertThat(sinkModel.getIncludeKeys(), equalTo(Arrays.asList("/abc", "/bcd", "/efg"))); + + } + + @Test + void sinkModel_with_exclude_keys() throws IOException { + final Map pluginSettings = new LinkedHashMap<>(); + final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("/"), Arrays.asList("bcd", "/abc", "efg/"), pluginSettings); assertThat(sinkModel.getIncludeKeys(), equalTo(new ArrayList())); assertThat(sinkModel.getExcludeKeys(), equalTo(Arrays.asList("/abc", "/bcd", "/efg"))); } + @Test + void sinkModel_with_both_include_and_exclude_keys() throws IOException { + final Map pluginSettings = new LinkedHashMap<>(); + assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("abc"), List.of("bcd"), pluginSettings)); + } + @Nested class BuilderTest { private PluginModel pluginModel; @@ -181,10 +199,11 @@ void build_with_only_PluginModel_should_return_expected_SinkModel() { assertThat(actualSinkModel.getExcludeKeys(), empty()); assertThat(actualSinkModel.getTagsTargetKey(), nullValue()); assertThat(actualSinkModel.getTagsTargetKey(), nullValue()); + } } private static String createStringFromInputStream(final InputStream inputStream) throws IOException { return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); } -} +} \ No newline at end of file diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/OutputCodecContextTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/OutputCodecContextTest.java new file mode 100644 index 0000000000..db1ec8cf86 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/OutputCodecContextTest.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.sink; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class OutputCodecContextTest { + + + @Test + public void testOutputCodecContextBasic() { + final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6); + final List testIncludeKeys = Collections.emptyList(); + final List testExcludeKeys = Collections.emptyList(); + OutputCodecContext codecContext = new OutputCodecContext(testTagsTargetKey, testIncludeKeys, testExcludeKeys); + assertThat(codecContext.getTagsTargetKey(), equalTo(testTagsTargetKey)); + assertThat(codecContext.getIncludeKeys(), equalTo(testIncludeKeys)); + assertThat(codecContext.getExcludeKeys(), equalTo(testExcludeKeys)); + + OutputCodecContext emptyContext = new OutputCodecContext(); + assertNull(emptyContext.getTagsTargetKey()); + assertThat(emptyContext.getIncludeKeys(), equalTo(testIncludeKeys)); + assertThat(emptyContext.getExcludeKeys(), equalTo(testExcludeKeys)); + + + } + + @Test + public void testOutputCodecContextAdapter() { + final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6); + final List testIncludeKeys = Collections.emptyList(); + final List testExcludeKeys = Collections.emptyList(); + + SinkContext sinkContext = new SinkContext(testTagsTargetKey, null, testIncludeKeys, testExcludeKeys); + + OutputCodecContext codecContext = OutputCodecContext.fromSinkContext(sinkContext); + assertThat(codecContext.getTagsTargetKey(), equalTo(testTagsTargetKey)); + assertThat(codecContext.getIncludeKeys(), equalTo(testIncludeKeys)); + assertThat(codecContext.getExcludeKeys(), equalTo(testExcludeKeys)); + + OutputCodecContext emptyContext = OutputCodecContext.fromSinkContext(null); + assertNull(emptyContext.getTagsTargetKey()); + assertThat(emptyContext.getIncludeKeys(), equalTo(testIncludeKeys)); + assertThat(emptyContext.getExcludeKeys(), equalTo(testExcludeKeys)); + + + } +} diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java index 5d9cb870e4..0bdc919bd8 100644 --- a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java @@ -15,13 +15,13 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -46,6 +46,8 @@ public class AvroOutputCodec implements OutputCodec { private Schema schema; + private OutputCodecContext codecContext; + @DataPrepperPluginConstructor public AvroOutputCodec(final AvroOutputCodecConfig config) { Objects.requireNonNull(config); @@ -53,8 +55,10 @@ public AvroOutputCodec(final AvroOutputCodecConfig config) { } @Override - public void start(final OutputStream outputStream, final Event event, final String tagsTargetKey) throws IOException { + public void start(final OutputStream outputStream, final Event event, final OutputCodecContext codecContext) throws IOException { Objects.requireNonNull(outputStream); + Objects.requireNonNull(codecContext); + this.codecContext = codecContext; if (config.getSchema() != null) { schema = parseSchema(config.getSchema()); } else if (config.getFileLocation() != null) { @@ -63,53 +67,48 @@ public void start(final OutputStream outputStream, final Event event, final Stri schema = parseSchema(AvroSchemaParserFromSchemaRegistry.getSchemaType(config.getSchemaRegistryUrl())); } else if (checkS3SchemaValidity()) { schema = AvroSchemaParserFromS3.parseSchema(config); - }else { - schema = buildInlineSchemaFromEvent(event, tagsTargetKey); + } else { + schema = buildInlineSchemaFromEvent(event); } final DatumWriter datumWriter = new GenericDatumWriter(schema); dataFileWriter = new DataFileWriter<>(datumWriter); dataFileWriter.create(schema, outputStream); } - public Schema buildInlineSchemaFromEvent(final Event event, final String tagsTargetKey) throws IOException { - if(tagsTargetKey!=null){ - return parseSchema(buildSchemaStringFromEventMap(addTagsToEvent(event, tagsTargetKey).toMap(), false)); - }else{ + public Schema buildInlineSchemaFromEvent(final Event event) throws IOException { + if (codecContext != null && codecContext.getTagsTargetKey() != null) { + return parseSchema(buildSchemaStringFromEventMap(addTagsToEvent(event, codecContext.getTagsTargetKey()).toMap(), false)); + } else { return parseSchema(buildSchemaStringFromEventMap(event.toMap(), false)); } } private String buildSchemaStringFromEventMap(final Map eventData, boolean nestedRecordFlag) { final StringBuilder builder = new StringBuilder(); - int nestedRecordIndex=1; - if(nestedRecordFlag==false){ + int nestedRecordIndex = 1; + if (!nestedRecordFlag) { builder.append(BASE_SCHEMA_STRING); - }else{ - builder.append("{\"type\":\"record\",\"name\":\""+"NestedRecord"+nestedRecordIndex+"\",\"fields\":["); + } else { + builder.append("{\"type\":\"record\",\"name\":\"" + "NestedRecord" + nestedRecordIndex + "\",\"fields\":["); nestedRecordIndex++; } String fields; int index = 0; - for(final String key: eventData.keySet()){ - if(config.getExcludeKeys()==null){ - config.setExcludeKeys(new ArrayList<>()); - } - if(config.getExcludeKeys().contains(key)){ + for (final String key : eventData.keySet()) { + if (codecContext != null && codecContext.getExcludeKeys().contains(key)) { continue; } - if(index == 0){ - if(!(eventData.get(key) instanceof Map)){ - fields = "{\"name\":\""+key+"\",\"type\":\""+typeMapper(eventData.get(key))+"\"}"; + if (index == 0) { + if (!(eventData.get(key) instanceof Map)) { + fields = "{\"name\":\"" + key + "\",\"type\":\"" + typeMapper(eventData.get(key)) + "\"}"; + } else { + fields = "{\"name\":\"" + key + "\",\"type\":" + typeMapper(eventData.get(key)) + "}"; } - else{ - fields = "{\"name\":\""+key+"\",\"type\":"+typeMapper(eventData.get(key))+"}"; - } - } - else{ - if(!(eventData.get(key) instanceof Map)){ - fields = ","+"{\"name\":\""+key+"\",\"type\":\""+typeMapper(eventData.get(key))+"\"}"; - }else{ - fields = ","+"{\"name\":\""+key+"\",\"type\":"+typeMapper(eventData.get(key))+"}"; + } else { + if (!(eventData.get(key) instanceof Map)) { + fields = "," + "{\"name\":\"" + key + "\",\"type\":\"" + typeMapper(eventData.get(key)) + "\"}"; + } else { + fields = "," + "{\"name\":\"" + key + "\",\"type\":" + typeMapper(eventData.get(key)) + "}"; } } builder.append(fields); @@ -120,20 +119,19 @@ private String buildSchemaStringFromEventMap(final Map eventData } private String typeMapper(final Object value) { - if(value instanceof Integer || value.getClass().equals(int.class)){ + if (value instanceof Integer || value.getClass().equals(int.class)) { return "int"; - }else if(value instanceof Float || value.getClass().equals(float.class)){ + } else if (value instanceof Float || value.getClass().equals(float.class)) { return "float"; - }else if(value instanceof Double || value.getClass().equals(double.class)){ + } else if (value instanceof Double || value.getClass().equals(double.class)) { return "double"; - }else if(value instanceof Long || value.getClass().equals(long.class)){ + } else if (value instanceof Long || value.getClass().equals(long.class)) { return "long"; - }else if(value instanceof Byte[]){ + } else if (value instanceof Byte[]) { return "bytes"; - }else if(value instanceof Map){ + } else if (value instanceof Map) { return buildSchemaStringFromEventMap((Map) value, true); - } - else{ + } else { return "string"; } } @@ -145,10 +143,10 @@ public void complete(final OutputStream outputStream) throws IOException { } @Override - public void writeEvent(final Event event, final OutputStream outputStream, final String tagsTargetKey) throws IOException { + public void writeEvent(final Event event, final OutputStream outputStream) throws IOException { Objects.requireNonNull(event); - if (tagsTargetKey != null) { - final GenericRecord avroRecord = buildAvroRecord(schema, addTagsToEvent(event, tagsTargetKey).toMap()); + if (codecContext.getTagsTargetKey() != null) { + final GenericRecord avroRecord = buildAvroRecord(schema, addTagsToEvent(event, codecContext.getTagsTargetKey()).toMap()); dataFileWriter.append(avroRecord); } else { final GenericRecord avroRecord = buildAvroRecord(schema, event.toMap()); @@ -173,9 +171,9 @@ Schema parseSchema(final String schemaString) throws IOException { private GenericRecord buildAvroRecord(final Schema schema, final Map eventData) { final GenericRecord avroRecord = new GenericData.Record(schema); - final boolean isExcludeKeyAvailable = !Objects.isNull(config.getExcludeKeys()); + final boolean isExcludeKeyAvailable = !codecContext.getExcludeKeys().isEmpty(); for (final String key : eventData.keySet()) { - if (isExcludeKeyAvailable && config.getExcludeKeys().contains(key)) { + if (isExcludeKeyAvailable && codecContext.getExcludeKeys().contains(key)) { continue; } final Schema.Field field = schema.getField(key); @@ -228,10 +226,6 @@ private Object schemaMapper(final Schema.Field field, final Object rawValue) { } private boolean checkS3SchemaValidity() throws IOException { - if (config.getBucketName() != null && config.getFileKey() != null && config.getRegion() != null) { - return true; - } else { - return false; - } + return config.getBucketName() != null && config.getFileKey() != null && config.getRegion() != null; } } \ No newline at end of file diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecConfig.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecConfig.java index 6d28b74190..0ff77aa19f 100644 --- a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecConfig.java +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecConfig.java @@ -8,8 +8,6 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.Size; -import java.util.List; - /** * Configuration class for {@link AvroOutputCodec}. */ @@ -23,8 +21,6 @@ public class AvroOutputCodecConfig { @JsonProperty("schema_file_location") private String fileLocation; - @JsonProperty("exclude_keys") - private List excludeKeys; @Valid @Size(max = 0, message = "Schema from schema registry is not supported.") @JsonProperty("schema_registry_url") @@ -45,10 +41,6 @@ public class AvroOutputCodecConfig { @JsonProperty("file_key") private String fileKey; - public List getExcludeKeys() { - return excludeKeys; - } - public String getSchema() { return schema; } @@ -76,8 +68,6 @@ public String getBucketName() { public String getFileKey() { return fileKey; } - public void setExcludeKeys(List excludeKeys) { - this.excludeKeys = excludeKeys; - } + } diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroCodecsIT.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroCodecsIT.java index c066f810b8..cf22fe7e50 100644 --- a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroCodecsIT.java +++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroCodecsIT.java @@ -25,6 +25,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -179,9 +180,10 @@ public void test_HappyCaseAvroInputStream_then_callsConsumerWithParsedEvents(fin verify(eventConsumer, times(numberOfRecords)).accept(recordArgumentCaptor.capture()); final List> actualRecords = recordArgumentCaptor.getAllValues(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - avroOutputCodec.start(outputStream, null, null); + OutputCodecContext codecContext = new OutputCodecContext(); + avroOutputCodec.start(outputStream, null, codecContext); for (Record record : actualRecords) { - avroOutputCodec.writeEvent(record.getData(), outputStream, null); + avroOutputCodec.writeEvent(record.getData(), outputStream); } avroOutputCodec.complete(outputStream); List actualOutputRecords = createAvroRecordsList(outputStream); diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java index a5b08fa9f2..e9010f44de 100644 --- a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java +++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java @@ -17,11 +17,13 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -31,7 +33,7 @@ import static org.hamcrest.MatcherAssert.assertThat; public class AvroOutputCodecTest { - private static String expectedSchemaString = "{\"type\":\"record\",\"name\":\"AvroRecords\",\"fields\":[{\"name\"" + + private static final String expectedSchemaString = "{\"type\":\"record\",\"name\":\"AvroRecords\",\"fields\":[{\"name\"" + ":\"name\",\"type\":\"string\"},{\"name\":\"nestedRecord\",\"type\":{\"type\":\"record\",\"name\":" + "\"NestedRecord1\",\"fields\":[{\"name\":\"secondFieldInNestedRecord\",\"type\":\"int\"},{\"name\":\"" + "firstFieldInNestedRecord\",\"type\":\"string\"}]}},{\"name\":\"age\",\"type\":\"int\"}]}"; @@ -51,13 +53,14 @@ private AvroOutputCodec createObjectUnderTest() { @ParameterizedTest @ValueSource(ints = {1, 2, 10, 100}) void test_happy_case(final int numberOfRecords) throws Exception { - this.numberOfRecords = numberOfRecords; + AvroOutputCodecTest.numberOfRecords = numberOfRecords; AvroOutputCodec avroOutputCodec = createObjectUnderTest(); outputStream = new ByteArrayOutputStream(); - avroOutputCodec.start(outputStream, null, null); + OutputCodecContext codecContext = new OutputCodecContext(); + avroOutputCodec.start(outputStream, null, codecContext); for (int index = 0; index < numberOfRecords; index++) { - final Event event = (Event) getRecord(index).getData(); - avroOutputCodec.writeEvent(event, outputStream, null); + final Event event = getRecord(index).getData(); + avroOutputCodec.writeEvent(event, outputStream); } avroOutputCodec.complete(outputStream); List actualRecords = createAvroRecordsList(outputStream); @@ -70,11 +73,10 @@ void test_happy_case(final int numberOfRecords) throws Exception { Map expectedMap = generateRecords(numberOfRecords).get(index); Map actualMap = new HashMap(); for (Schema.Field field : actualRecord.getSchema().getFields()) { - if(actualRecord.get(field.name()) instanceof GenericRecord){ + if (actualRecord.get(field.name()) instanceof GenericRecord) { GenericRecord nestedRecord = (GenericRecord) actualRecord.get(field.name()); actualMap.put(field.name(), convertRecordToMap(nestedRecord)); - } - else{ + } else { Object decodedActualOutput = decodeOutputIfEncoded(actualRecord.get(field.name())); actualMap.put(field.name(), decodedActualOutput); } @@ -83,13 +85,14 @@ void test_happy_case(final int numberOfRecords) throws Exception { index++; } } + @Test public void testInlineSchemaBuilder() throws IOException { Schema expectedSchema = new Schema.Parser().parse(expectedSchemaString); AvroOutputCodec avroOutputCodec = createObjectUnderTest(); numberOfRecords = 1; Event event = getRecord(0).getData(); - Schema actualSchema = avroOutputCodec.buildInlineSchemaFromEvent(event, null); + Schema actualSchema = avroOutputCodec.buildInlineSchemaFromEvent(event); assertThat(actualSchema, Matchers.equalTo(expectedSchema)); } @@ -111,7 +114,7 @@ private static List generateRecords(int numberOfRecords) { eventData.put("name", "Person" + rows); eventData.put("age", rows); HashMap nestedRecord = new HashMap<>(); - nestedRecord.put("firstFieldInNestedRecord", "testString"+rows); + nestedRecord.put("firstFieldInNestedRecord", "testString" + rows); nestedRecord.put("secondFieldInNestedRecord", rows); eventData.put("nestedRecord", nestedRecord); recordList.add((eventData)); @@ -120,8 +123,8 @@ private static List generateRecords(int numberOfRecords) { return recordList; } - private static Schema parseSchema() { - Schema innerSchema=parseInnerSchemaForNestedRecord(); + private static Schema parseSchema() { + Schema innerSchema = parseInnerSchemaForNestedRecord(); return SchemaBuilder.record("Person") .fields() .name("name").type().stringType().noDefault() @@ -131,7 +134,7 @@ private static Schema parseSchema() { } - private static Schema parseInnerSchemaForNestedRecord(){ + private static Schema parseInnerSchemaForNestedRecord() { return SchemaBuilder .record("nestedRecord") .fields() @@ -145,11 +148,10 @@ private static Schema parseInnerSchemaForNestedRecord(){ } private static Object decodeOutputIfEncoded(Object encodedActualOutput) throws UnsupportedEncodingException { - if(encodedActualOutput instanceof Utf8){ - byte[] utf8Bytes = encodedActualOutput.toString().getBytes("UTF-8"); - return new String(utf8Bytes, "UTF-8"); - } - else{ + if (encodedActualOutput instanceof Utf8) { + byte[] utf8Bytes = encodedActualOutput.toString().getBytes(StandardCharsets.UTF_8); + return new String(utf8Bytes, StandardCharsets.UTF_8); + } else { return encodedActualOutput; } } @@ -166,9 +168,10 @@ private static List createAvroRecordsList(ByteArrayOutputStream o } return actualRecords; } + private static Map convertRecordToMap(GenericRecord nestedRecord) throws Exception { final Map eventData = new HashMap<>(); - for(Schema.Field field : nestedRecord.getSchema().getFields()){ + for (Schema.Field field : nestedRecord.getSchema().getFields()) { Object value = decodeOutputIfEncoded(nestedRecord.get(field.name())); eventData.put(field.name(), value); } diff --git a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java index 59ff829a5e..5f60446604 100644 --- a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java +++ b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,7 @@ public class CsvOutputCodec implements OutputCodec { private int headerLength = 0; private static final ObjectMapper objectMapper = new ObjectMapper(); private List headerList; + private OutputCodecContext codecContext; @DataPrepperPluginConstructor public CsvOutputCodec(final CsvOutputCodecConfig config) { @@ -40,20 +42,22 @@ public CsvOutputCodec(final CsvOutputCodecConfig config) { } @Override - public void start(final OutputStream outputStream, Event event, String tagsTargetKey) throws IOException { + public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException { Objects.requireNonNull(outputStream); + Objects.requireNonNull(codecContext); + this.codecContext = codecContext; if (config.getHeader() != null) { headerList = config.getHeader(); } else if (config.getHeaderFileLocation() != null) { try { headerList = CsvHeaderParser.headerParser(config.getHeaderFileLocation()); } catch (Exception e) { - LOG.error("Unable to parse CSV Header, Error:{} ",e.getMessage()); + LOG.error("Unable to parse CSV Header, Error:{} ", e.getMessage()); throw new IOException("Unable to parse CSV Header."); } - }else if(checkS3HeaderValidity()){ + } else if (checkS3HeaderValidity()) { headerList = CsvHeaderParserFromS3.parseHeader(config); - }else { + } else { LOG.error("No header provided."); throw new IOException("No header found. Can't proceed without header."); } @@ -69,20 +73,18 @@ public void complete(final OutputStream outputStream) throws IOException { } @Override - public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException { + public void writeEvent(final Event event, final OutputStream outputStream) throws IOException { Objects.requireNonNull(event); final Map eventMap; - if(tagsTargetKey!=null){ - eventMap = addTagsToEvent(event,tagsTargetKey).toMap(); - }else{ + if (codecContext.getTagsTargetKey() != null) { + eventMap = addTagsToEvent(event, codecContext.getTagsTargetKey()).toMap(); + } else { eventMap = event.toMap(); } - if (!Objects.isNull(config.getExcludeKeys())) { - for (final String key : config.getExcludeKeys()) { - if (eventMap.containsKey(key)) { - eventMap.remove(key); - } + if (!codecContext.getExcludeKeys().isEmpty()) { + for (final String key : codecContext.getExcludeKeys()) { + eventMap.remove(key); } } @@ -110,10 +112,11 @@ private void writeToOutputStream(final OutputStream outputStream, final byte[] b public String getExtension() { return CSV; } + private boolean checkS3HeaderValidity() throws IOException { - if(config.getBucketName()!=null && config.getFile_key()!=null && config.getRegion()!=null){ + if (config.getBucketName() != null && config.getFile_key() != null && config.getRegion() != null) { return true; - }else{ + } else { LOG.error("Invalid S3 credentials, can't reach the header file."); throw new IOException("Can't proceed without header."); } diff --git a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecConfig.java b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecConfig.java index b0a739e199..d6862ab2e1 100644 --- a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecConfig.java +++ b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecConfig.java @@ -19,8 +19,6 @@ public class CsvOutputCodecConfig { @JsonProperty("header") private List header; - @JsonProperty("exclude_keys") - private List excludeKeys; @Valid @Size(max = 0, message = "Header from file is not supported.") @JsonProperty("header_file_location") @@ -43,10 +41,6 @@ public String getHeaderFileLocation() { return headerFileLocation; } - public List getExcludeKeys() { - return excludeKeys; - } - public String getDelimiter() { return delimiter; } @@ -54,9 +48,11 @@ public String getDelimiter() { public List getHeader() { return header; } + public void setHeader(List header) { this.header = header; } + public String getRegion() { return region; } @@ -68,7 +64,5 @@ public String getBucketName() { public String getFile_key() { return file_key; } - public void setExcludeKeys(List excludeKeys) { - this.excludeKeys = excludeKeys; - } + } \ No newline at end of file diff --git a/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvCodecsIT.java b/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvCodecsIT.java index fd35c90d04..c0d4d421b6 100644 --- a/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvCodecsIT.java +++ b/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvCodecsIT.java @@ -17,6 +17,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -48,6 +49,7 @@ public class CsvCodecsIT { private CsvInputCodec createObjectUnderTest() { return new CsvInputCodec(config); } + @BeforeEach void setup() { CsvInputCodecConfig defaultCsvCodecConfig = new CsvInputCodecConfig(); @@ -67,9 +69,9 @@ private CsvOutputCodec createOutputCodecObjectUnderTest() { @ParameterizedTest - @ValueSource(ints = { 1, 10, 100, 200}) + @ValueSource(ints = {1, 10, 100, 200}) void test_when_autoDetectHeaderHappyCase_then_callsConsumerWithParsedEvents(final int numberOfRows) throws IOException { - final InputStream inputStream = createCsvInputStream(numberOfRows,header()); + final InputStream inputStream = createCsvInputStream(numberOfRows, header()); CsvInputCodec csvInputCodec = createObjectUnderTest(); csvInputCodec.parse(inputStream, eventConsumer); @@ -78,33 +80,33 @@ void test_when_autoDetectHeaderHappyCase_then_callsConsumerWithParsedEvents(fina final List> actualRecords = recordArgumentCaptor.getAllValues(); CsvOutputCodec csvOutputCodec = createOutputCodecObjectUnderTest(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - csvOutputCodec.start(outputStream, null, null); - for (Record record: actualRecords){ - csvOutputCodec.writeEvent(record.getData(),outputStream, null); + OutputCodecContext codecContext = new OutputCodecContext(); + csvOutputCodec.start(outputStream, null, codecContext); + for (Record record : actualRecords) { + csvOutputCodec.writeEvent(record.getData(), outputStream); } csvOutputCodec.complete(outputStream); //createTestFileFromStream(outputStream); - String csvData = new String(outputStream.toByteArray(), StandardCharsets.UTF_8); + String csvData = outputStream.toString(StandardCharsets.UTF_8); StringReader stringReader = new StringReader(csvData); CSVReader csvReader = new CSVReaderBuilder(stringReader).build(); try { String[] line; - int index=0; + int index = 0; int headerIndex; List headerList = header(); List expectedRecords = generateRecords(numberOfRows); while ((line = csvReader.readNext()) != null) { - if(index==0){ - headerIndex=0; - for(String value: line){ + if (index == 0) { + headerIndex = 0; + for (String value : line) { assertThat(headerList.get(headerIndex), Matchers.equalTo(value)); headerIndex++; } - } - else{ - headerIndex=0; + } else { + headerIndex = 0; for (String value : line) { - assertThat(expectedRecords.get(index-1).get(headerList.get(headerIndex)), Matchers.equalTo(value)); + assertThat(expectedRecords.get(index - 1).get(headerList.get(headerIndex)), Matchers.equalTo(value)); headerIndex++; } } @@ -162,18 +164,19 @@ private InputStream createCsvInputStream(int numberOfRows, List header) throw new RuntimeException(e); } } + private String createCsvData(int numberOfRows, List header) throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); OutputStreamWriter writer = new OutputStreamWriter(outputStream); - for(int i=0;i header) throws IOExc writer.flush(); writer.close(); outputStream.close(); - return new String(outputStream.toByteArray()); + return outputStream.toString(); } } diff --git a/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecTest.java b/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecTest.java index 76a1798ddb..aa24654aae 100644 --- a/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecTest.java +++ b/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodecTest.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -71,10 +72,11 @@ void test_happy_case(final int numberOfRecords) throws IOException { CsvOutputCodecTest.numberOfRecords = numberOfRecords; CsvOutputCodec csvOutputCodec = createObjectUnderTest(); outputStream = new ByteArrayOutputStream(); - csvOutputCodec.start(outputStream, null, null); + OutputCodecContext codecContext = new OutputCodecContext(); + csvOutputCodec.start(outputStream, null, codecContext); for (int index = 0; index < numberOfRecords; index++) { final Event event = (Event) getRecord(index).getData(); - csvOutputCodec.writeEvent(event, outputStream, null); + csvOutputCodec.writeEvent(event, outputStream); } csvOutputCodec.complete(outputStream); String csvData = outputStream.toString(StandardCharsets.UTF_8); diff --git a/data-prepper-plugins/opensearch/README.md b/data-prepper-plugins/opensearch/README.md index f40f1ce0b0..2f08f8d3c5 100644 --- a/data-prepper-plugins/opensearch/README.md +++ b/data-prepper-plugins/opensearch/README.md @@ -209,6 +209,64 @@ With the `document_root_key` set to `status`. The document structure would be `{ duration: "15 ms" } ``` +- `include_keys`: A list of keys to be included (retained). The key in the list can be a valid JSON path, such as 'request/status'. This option can work together with `document_root_key`. + +For example, If we have the following sample event: +``` +{ + status: 200, + message: null, + metadata: { + sourceIp: "123.212.49.58", + destinationIp: "79.54.67.231", + bytes: 3545, + duration: "15 ms" + } +} +``` +if `include_keys` is set to ["status", "metadata/sourceIp"], the document written to OpenSearch would be: +``` +{ + status: 200, + metadata: { + sourceIp: "123.212.49.58" + } +} +``` +if you have also set `document_root_key` as "metadata", and the include_keys as ["sourceIp, "bytes"], the document written to OpenSearch would be: +``` +{ + sourceIp: "123.212.49.58", + bytes: 3545 +} +``` + +- `exclude_keys`: Similar to include_keys except any keys in the list will be excluded. Note that you should not have both include_keys and exclude_keys in the configuration at the same time. + +For example, If we have the following sample event: +``` +{ + status: 200, + message: null, + metadata: { + sourceIp: "123.212.49.58", + destinationIp: "79.54.67.231", + bytes: 3545, + duration: "15 ms" + } +} +``` +if `exclude_keys` is set to ["status", "metadata/sourceIp"], the document written to OpenSearch would be: +``` +{ + message: null, + metadata: { + destinationIp: "79.54.67.231", + bytes: 3545, + duration: "15 ms" + } +} +``` - `distribution_version`: A String indicating whether the sink backend version is Elasticsearch 6 or above (i.e. Elasticsearch 7.x or OpenSearch). `es6` represents Elasticsearch 6; `default` represents latest compatible backend version (Elasticsearch 7.x, OpenSearch 1.x, OpenSearch 2.x). Default to `default`. ### AWS Configuration diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java index 71e6fae4b5..3b61c19220 100644 --- a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.plugins.fs.LocalInputFile; import org.opensearch.dataprepper.plugins.fs.LocalOutputFile; import org.opensearch.dataprepper.plugins.s3keyindex.S3ObjectIndexUtility; @@ -31,7 +32,6 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -40,7 +40,7 @@ @DataPrepperPlugin(name = "parquet", pluginType = OutputCodec.class, pluginConfigurationType = ParquetOutputCodecConfig.class) public class ParquetOutputCodec implements OutputCodec { - private static final Logger LOG = LoggerFactory.getLogger(ParquetOutputCodec.class); + private static final Logger LOG = LoggerFactory.getLogger(ParquetOutputCodec.class); private final ParquetOutputCodecConfig config; private static final String BASE_SCHEMA_STRING = "{\"type\":\"record\",\"name\":\"ParquetRecords\",\"fields\":["; @@ -49,6 +49,7 @@ public class ParquetOutputCodec implements OutputCodec { private ParquetWriter writer; private final ApacheHttpClient.Builder apacheHttpClientBuilder = ApacheHttpClient.builder(); private S3Client s3Client; + private OutputCodecContext codecContext; private static final String PARQUET = "parquet"; private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\%\\{.*?\\}"; @@ -68,15 +69,20 @@ public ParquetOutputCodec(final ParquetOutputCodecConfig config) { } @Override - public synchronized void start(final OutputStream outputStream, final Event event, final String tagsTargetKey) throws IOException { + public synchronized void start(final OutputStream outputStream, final Event event, final OutputCodecContext codecContext) throws IOException { + Objects.requireNonNull(outputStream); + Objects.requireNonNull(codecContext); + this.codecContext = codecContext; this.s3Client = buildS3Client(); - buildSchemaAndKey(event, tagsTargetKey); + buildSchemaAndKey(event, codecContext.getTagsTargetKey()); final S3OutputFile s3OutputFile = new S3OutputFile(s3Client, bucket, key); buildWriter(s3OutputFile); } - public synchronized void start(File file) throws IOException { - LocalOutputFile localOutputFile =new LocalOutputFile(file); + public synchronized void start(File file, final OutputCodecContext codecContext) throws IOException { + Objects.requireNonNull(codecContext); + this.codecContext = codecContext; + LocalOutputFile localOutputFile = new LocalOutputFile(file); buildSchemaAndKey(null, null); buildWriter(localOutputFile); } @@ -84,57 +90,52 @@ public synchronized void start(File file) throws IOException { void buildSchemaAndKey(final Event event, final String tagsTargetKey) throws IOException { if (config.getSchema() != null) { schema = parseSchema(config.getSchema()); - } else if(config.getFileLocation()!=null){ + } else if (config.getFileLocation() != null) { schema = ParquetSchemaParser.parseSchemaFromJsonFile(config.getFileLocation()); - }else if(config.getSchemaRegistryUrl()!=null){ + } else if (config.getSchemaRegistryUrl() != null) { schema = parseSchema(ParquetSchemaParserFromSchemaRegistry.getSchemaType(config.getSchemaRegistryUrl())); - }else if(checkS3SchemaValidity()){ + } else if (checkS3SchemaValidity()) { schema = ParquetSchemaParserFromS3.parseSchema(config); - } - else{ + } else { schema = buildInlineSchemaFromEvent(event, tagsTargetKey); } key = generateKey(); } + public Schema buildInlineSchemaFromEvent(final Event event, final String tagsTargetKey) throws IOException { - if(tagsTargetKey!=null){ + if (tagsTargetKey != null) { return parseSchema(buildSchemaStringFromEventMap(addTagsToEvent(event, tagsTargetKey).toMap(), false)); - }else{ + } else { return parseSchema(buildSchemaStringFromEventMap(event.toMap(), false)); } } private String buildSchemaStringFromEventMap(final Map eventData, boolean nestedRecordFlag) { final StringBuilder builder = new StringBuilder(); - int nestedRecordIndex=1; - if(nestedRecordFlag==false){ + int nestedRecordIndex = 1; + if (!nestedRecordFlag) { builder.append(BASE_SCHEMA_STRING); - }else{ - builder.append("{\"type\":\"record\",\"name\":\""+"NestedRecord"+nestedRecordIndex+"\",\"fields\":["); + } else { + builder.append("{\"type\":\"record\",\"name\":\"" + "NestedRecord" + nestedRecordIndex + "\",\"fields\":["); nestedRecordIndex++; } String fields; int index = 0; - for(final String key: eventData.keySet()){ - if(config.getExcludeKeys()==null){ - config.setExcludeKeys(new ArrayList<>()); - } - if(config.getExcludeKeys().contains(key)){ + for (final String key : eventData.keySet()) { + if (codecContext.getExcludeKeys().contains(key)) { continue; } - if(index == 0){ - if(!(eventData.get(key) instanceof Map)){ - fields = "{\"name\":\""+key+"\",\"type\":\""+typeMapper(eventData.get(key))+"\"}"; - } - else{ - fields = "{\"name\":\""+key+"\",\"type\":"+typeMapper(eventData.get(key))+"}"; + if (index == 0) { + if (!(eventData.get(key) instanceof Map)) { + fields = "{\"name\":\"" + key + "\",\"type\":\"" + typeMapper(eventData.get(key)) + "\"}"; + } else { + fields = "{\"name\":\"" + key + "\",\"type\":" + typeMapper(eventData.get(key)) + "}"; } - } - else{ - if(!(eventData.get(key) instanceof Map)){ - fields = ","+"{\"name\":\""+key+"\",\"type\":\""+typeMapper(eventData.get(key))+"\"}"; - }else{ - fields = ","+"{\"name\":\""+key+"\",\"type\":"+typeMapper(eventData.get(key))+"}"; + } else { + if (!(eventData.get(key) instanceof Map)) { + fields = "," + "{\"name\":\"" + key + "\",\"type\":\"" + typeMapper(eventData.get(key)) + "\"}"; + } else { + fields = "," + "{\"name\":\"" + key + "\",\"type\":" + typeMapper(eventData.get(key)) + "}"; } } builder.append(fields); @@ -145,20 +146,19 @@ private String buildSchemaStringFromEventMap(final Map eventData } private String typeMapper(final Object value) { - if(value instanceof Integer || value.getClass().equals(int.class)){ + if (value instanceof Integer || value.getClass().equals(int.class)) { return "int"; - }else if(value instanceof Float || value.getClass().equals(float.class)){ + } else if (value instanceof Float || value.getClass().equals(float.class)) { return "float"; - }else if(value instanceof Double || value.getClass().equals(double.class)){ + } else if (value instanceof Double || value.getClass().equals(double.class)) { return "double"; - }else if(value instanceof Long || value.getClass().equals(long.class)){ + } else if (value instanceof Long || value.getClass().equals(long.class)) { return "long"; - }else if(value instanceof Byte[]){ + } else if (value instanceof Byte[]) { return "bytes"; - }else if(value instanceof Map){ + } else if (value instanceof Map) { return buildSchemaStringFromEventMap((Map) value, true); - } - else{ + } else { return "string"; } } @@ -170,16 +170,16 @@ private void buildWriter(OutputFile outputFile) throws IOException { } @Override - public void writeEvent(final Event event, final OutputStream outputStream,final String tagsTargetKey) throws IOException { + public void writeEvent(final Event event, final OutputStream outputStream) throws IOException { final GenericData.Record parquetRecord = new GenericData.Record(schema); final Event modifiedEvent; - if (tagsTargetKey != null) { - modifiedEvent = addTagsToEvent(event, tagsTargetKey); + if (codecContext.getTagsTargetKey() != null) { + modifiedEvent = addTagsToEvent(event, codecContext.getTagsTargetKey()); } else { modifiedEvent = event; } for (final String key : modifiedEvent.toMap().keySet()) { - if (config.getExcludeKeys().contains(key)) { + if (codecContext.getExcludeKeys().contains(key)) { continue; } final Schema.Field field = schema.getField(key); @@ -202,6 +202,7 @@ public synchronized void complete(final OutputStream outputStream) throws IOExce .build(); s3Client.deleteObject(deleteRequest); } + public void closeWriter(final OutputStream outputStream, File file) throws IOException { final LocalInputFile inputFile = new LocalInputFile(file); byte[] byteBuffer = inputFile.newStream().readAllBytes(); @@ -257,11 +258,12 @@ private static String buildObjectPath(final String pathPrefix) { private String buildObjectFileName(final String configNamePattern) { return S3ObjectIndexUtility.getObjectNameWithDateTimeId(configNamePattern) + "." + getExtension(); } - private static Object schemaMapper(final Schema.Field field , final Object rawValue){ + + private static Object schemaMapper(final Schema.Field field, final Object rawValue) { Object finalValue = null; - final String fieldType = field.schema().getType().name().toString().toLowerCase(); - if (field.schema().getLogicalType() == null && primitiveTypes.contains(fieldType)){ - switch (fieldType){ + final String fieldType = field.schema().getType().name().toLowerCase(); + if (field.schema().getLogicalType() == null && primitiveTypes.contains(fieldType)) { + switch (fieldType) { case "string": finalValue = rawValue.toString(); break; @@ -284,9 +286,9 @@ private static Object schemaMapper(final Schema.Field field , final Object rawVa LOG.error("Unrecognised Field name : '{}' & type : '{}'", field.name(), fieldType); break; } - }else{ + } else { final String logicalTypeName = field.schema().getLogicalType().getName(); - switch (logicalTypeName){ + switch (logicalTypeName) { case "date": finalValue = Integer.parseInt(rawValue.toString()); break; @@ -307,8 +309,9 @@ private static Object schemaMapper(final Schema.Field field , final Object rawVa break; } } - return finalValue; + return finalValue; } + boolean checkS3SchemaValidity() throws IOException { if (config.getSchemaBucket() != null && config.getFileKey() != null && config.getSchemaRegion() != null) { return true; diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecConfig.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecConfig.java index e1016496d4..4ade76f536 100644 --- a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecConfig.java +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecConfig.java @@ -9,13 +9,9 @@ import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Size; -import java.util.ArrayList; -import java.util.List; - public class ParquetOutputCodecConfig { private static final String DEFAULT_OBJECT_NAME_PATTERN = "events-%{yyyy-MM-dd'T'hh-mm-ss}"; - private static final List DEFAULT_EXCLUDE_KEYS = new ArrayList<>(); @JsonProperty("schema") private String schema; @@ -54,18 +50,12 @@ public class ParquetOutputCodecConfig { @NotNull @Valid private String pathPrefix; - @JsonProperty("exclude_keys") - private List excludeKeys = DEFAULT_EXCLUDE_KEYS; @Valid @Size(max = 0, message = "Schema from Schema Registry is not supported.") @JsonProperty("schema_registry_url") private String schemaRegistryUrl; - public List getExcludeKeys() { - return excludeKeys; - } - public String getFileLocation() { return fileLocation; } @@ -102,6 +92,7 @@ public String getPathPrefix() { public String getNamePattern() { return DEFAULT_OBJECT_NAME_PATTERN; } + public void setRegion(String region) { this.region = region; } @@ -113,6 +104,7 @@ public void setBucket(String bucket) { public void setPathPrefix(String pathPrefix) { this.pathPrefix = pathPrefix; } + public String getSchemaBucket() { return schemaBucket; } @@ -128,6 +120,7 @@ public String getSchemaRegion() { public void setFileKey(String fileKey) { this.fileKey = fileKey; } + public void setSchemaBucket(String schemaBucket) { this.schemaBucket = schemaBucket; } @@ -135,6 +128,7 @@ public void setSchemaBucket(String schemaBucket) { public void setSchemaRegion(String schemaRegion) { this.schemaRegion = schemaRegion; } + public void setFileLocation(String fileLocation) { this.fileLocation = fileLocation; } @@ -142,8 +136,6 @@ public void setFileLocation(String fileLocation) { public void setSchemaRegistryUrl(String schemaRegistryUrl) { this.schemaRegistryUrl = schemaRegistryUrl; } - public void setExcludeKeys(List excludeKeys) { - this.excludeKeys = excludeKeys; - } + } diff --git a/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java b/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java index abba1c5429..6017f09049 100644 --- a/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java +++ b/data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java @@ -28,6 +28,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -71,7 +72,7 @@ private static List generateRecords(int numberOfRecords) { eventData.put("doubleType", Double.valueOf(rows)); eventData.put("floatType", Float.valueOf(rows)); eventData.put("longType", Long.valueOf(rows)); - eventData.put("bytesType", ("Person"+rows).getBytes()); + eventData.put("bytesType", ("Person" + rows).getBytes()); recordList.add((eventData)); } @@ -97,6 +98,7 @@ private ParquetOutputCodec createObjectUnderTest() { config.setBucket("test"); config.setRegion("test"); return new ParquetOutputCodec(config); + } @ParameterizedTest @@ -106,10 +108,11 @@ void test_happy_case(final int numberOfRecords) throws Exception { ParquetOutputCodec parquetOutputCodec = createObjectUnderTest(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); final File tempFile = File.createTempFile(FILE_NAME, FILE_SUFFIX); - parquetOutputCodec.start(tempFile); + OutputCodecContext codecContext = new OutputCodecContext(); + parquetOutputCodec.start(tempFile, codecContext); for (int index = 0; index < numberOfRecords; index++) { final Event event = (Event) getRecord(index).getData(); - parquetOutputCodec.writeEvent(event, outputStream, null); + parquetOutputCodec.writeEvent(event, outputStream); } parquetOutputCodec.closeWriter(outputStream, tempFile); List> actualRecords = createParquetRecordsList(new ByteArrayInputStream(tempFile.toString().getBytes())); @@ -122,6 +125,7 @@ void test_happy_case(final int numberOfRecords) throws Exception { } tempFile.delete(); } + @Test public void test_getExtension() { ParquetOutputCodec parquetOutputCodec = createObjectUnderTest(); @@ -129,6 +133,7 @@ public void test_getExtension() { assertThat(extension, equalTo("parquet")); } + @Test public void whenNoSchemaProvided_thenThrowsException() { config = new ParquetOutputCodecConfig(); @@ -136,7 +141,7 @@ public void whenNoSchemaProvided_thenThrowsException() { config.setFileLocation(null); config.setSchemaRegistryUrl(null); ParquetOutputCodec parquetOutputCodec = new ParquetOutputCodec(config); - assertThrows(IOException.class,()-> + assertThrows(IOException.class, () -> parquetOutputCodec.buildSchemaAndKey(null, null)); } @@ -150,7 +155,7 @@ public void test_s3SchemaValidity() throws IOException { ParquetOutputCodec parquetOutputCodec = new ParquetOutputCodec(config); assertThat(parquetOutputCodec.checkS3SchemaValidity(), equalTo(Boolean.TRUE)); ParquetOutputCodec parquetOutputCodecFalse = createObjectUnderTest(); - assertThrows(IOException.class,()-> + assertThrows(IOException.class, () -> parquetOutputCodecFalse.checkS3SchemaValidity()); } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java index eedf0497f8..b2c68224ce 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import java.io.IOException; import java.io.OutputStream; @@ -27,6 +28,7 @@ public class JsonOutputCodec implements OutputCodec { private static final JsonFactory factory = new JsonFactory(); JsonOutputCodecConfig config; private JsonGenerator generator; + private OutputCodecContext codecContext; @DataPrepperPluginConstructor public JsonOutputCodec(final JsonOutputCodecConfig config) { @@ -35,8 +37,10 @@ public JsonOutputCodec(final JsonOutputCodecConfig config) { } @Override - public void start(final OutputStream outputStream, Event event, String tagsTargetKey) throws IOException { + public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException { Objects.requireNonNull(outputStream); + Objects.requireNonNull(codecContext); + this.codecContext = codecContext; generator = factory.createGenerator(outputStream, JsonEncoding.UTF8); generator.writeStartArray(); } @@ -50,18 +54,18 @@ public void complete(final OutputStream outputStream) throws IOException { } @Override - public synchronized void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException { + public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException { Objects.requireNonNull(event); final Event modifiedEvent; - if (tagsTargetKey != null) { - modifiedEvent = addTagsToEvent(event, tagsTargetKey); + if (codecContext.getTagsTargetKey() != null) { + modifiedEvent = addTagsToEvent(event, codecContext.getTagsTargetKey()); } else { modifiedEvent = event; } generator.writeStartObject(); - final boolean isExcludeKeyAvailable = !Objects.isNull(config.getExcludeKeys()); + final boolean isExcludeKeyAvailable = !codecContext.getExcludeKeys().isEmpty(); for (final String key : modifiedEvent.toMap().keySet()) { - if (isExcludeKeyAvailable && config.getExcludeKeys().contains(key)) { + if (isExcludeKeyAvailable && codecContext.getExcludeKeys().contains(key)) { continue; } generator.writeStringField(key, modifiedEvent.toMap().get(key).toString()); diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecConfig.java index 860a1c60ec..642717ab71 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecConfig.java @@ -4,16 +4,6 @@ */ package org.opensearch.dataprepper.plugins.codec.json; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.List; - public class JsonOutputCodecConfig { - - @JsonProperty("exclude_keys") - private List excludeKeys; - - public List getExcludeKeys() { - return excludeKeys; - } + } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputCodec.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputCodec.java index b6c45c9da7..f179d1a3fe 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputCodec.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputCodec.java @@ -9,10 +9,10 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import java.io.IOException; import java.io.OutputStream; -import java.util.Map; import java.util.Objects; /** @@ -24,6 +24,7 @@ public class NdjsonOutputCodec implements OutputCodec { private static final String NDJSON = "ndjson"; private static final ObjectMapper objectMapper = new ObjectMapper(); private final NdjsonOutputConfig config; + private OutputCodecContext codecContext; @DataPrepperPluginConstructor public NdjsonOutputCodec(final NdjsonOutputConfig config) { @@ -32,20 +33,23 @@ public NdjsonOutputCodec(final NdjsonOutputConfig config) { } @Override - public void start(final OutputStream outputStream, Event event, String tagsTargetKey) throws IOException { + public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException { Objects.requireNonNull(outputStream); + Objects.requireNonNull(codecContext); + this.codecContext = codecContext; } @Override - public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException { + public void writeEvent(final Event event, final OutputStream outputStream) throws IOException { Objects.requireNonNull(event); - Map eventMap; - if (tagsTargetKey != null) { - eventMap = addTagsToEvent(event, tagsTargetKey).toMap(); - } else { - eventMap = event.toMap(); - } - writeToOutputStream(outputStream, eventMap); + + String json = event.jsonBuilder() + .includeKeys(codecContext.getIncludeKeys()) + .excludeKeys(codecContext.getExcludeKeys()) + .includeTags(codecContext.getTagsTargetKey()) + .toJsonString(); + outputStream.write(json.getBytes()); + outputStream.write(System.lineSeparator().getBytes()); } @Override @@ -53,24 +57,6 @@ public void complete(final OutputStream outputStream) throws IOException { outputStream.close(); } - private void writeToOutputStream(final OutputStream outputStream, final Object object) throws IOException { - byte[] byteArr = null; - if (object instanceof Map) { - Map map = objectMapper.convertValue(object, Map.class); - for (String key : config.getExcludeKeys()) { - if (map.containsKey(key)) { - map.remove(key); - } - } - String json = objectMapper.writeValueAsString(map); - byteArr = json.getBytes(); - } else { - byteArr = object.toString().getBytes(); - } - outputStream.write(byteArr); - outputStream.write(System.lineSeparator().getBytes()); - } - @Override public String getExtension() { return NDJSON; diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputConfig.java index 196f00b860..cb12539779 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputConfig.java @@ -5,25 +5,8 @@ package org.opensearch.dataprepper.plugins.codec.json; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.ArrayList; -import java.util.List; - /** * Configuration class for the newline delimited codec. */ public class NdjsonOutputConfig { - private static final List DEFAULT_EXCLUDE_KEYS = new ArrayList<>(); - - @JsonProperty("exclude_keys") - private List excludeKeys = DEFAULT_EXCLUDE_KEYS; - - public List getExcludeKeys() { - return excludeKeys; - } - - public void setExcludeKeys(List excludeKeys) { - this.excludeKeys = excludeKeys; - } } \ No newline at end of file diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonCodecsIT.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonCodecsIT.java index 160b14b4da..1ea165938c 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonCodecsIT.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonCodecsIT.java @@ -14,6 +14,7 @@ import org.mockito.ArgumentCaptor; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -49,9 +50,11 @@ void setUp() { private JsonInputCodec createJsonInputCodecObjectUnderTest() { return new JsonInputCodec(); } + private JsonOutputCodec createJsonOutputCodecObjectUnderTest() { return new JsonOutputCodec(new JsonOutputCodecConfig()); } + @ParameterizedTest @ValueSource(ints = {1, 2, 10, 100}) void parse_with_InputStream_calls_Consumer_with_Event(final int numberOfObjects) throws IOException { @@ -65,7 +68,8 @@ void parse_with_InputStream_calls_Consumer_with_Event(final int numberOfObjects) final List> actualRecords = recordArgumentCaptor.getAllValues(); JsonOutputCodec jsonOutputCodec = createJsonOutputCodecObjectUnderTest(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - jsonOutputCodec.start(outputStream, null, null); + OutputCodecContext codecContext = new OutputCodecContext(); + jsonOutputCodec.start(outputStream, null, codecContext); assertThat(actualRecords.size(), equalTo(numberOfObjects)); @@ -73,16 +77,16 @@ void parse_with_InputStream_calls_Consumer_with_Event(final int numberOfObjects) for (int i = 0; i < actualRecords.size(); i++) { final Record actualRecord = actualRecords.get(i); - jsonOutputCodec.writeEvent(actualRecord.getData(),outputStream, null); + jsonOutputCodec.writeEvent(actualRecord.getData(), outputStream); } jsonOutputCodec.complete(outputStream); - int index=0; + int index = 0; ObjectMapper mapper = new ObjectMapper(); JsonNode jsonNode = mapper.readTree(outputStream.toByteArray()); for (JsonNode element : jsonNode) { Set keys = initialRecords.get(index).keySet(); - Map actualMap = new HashMap<>(); - for(String key: keys){ + Map actualMap = new HashMap<>(); + for (String key : keys) { actualMap.put(key, element.get(key).asText()); } assertThat(initialRecords.get(index), Matchers.equalTo(actualMap)); @@ -106,11 +110,11 @@ private static List generateRecords(int numberOfRecords) { List recordList = new ArrayList<>(); - for(int rows = 0; rows < numberOfRecords; rows++){ + for (int rows = 0; rows < numberOfRecords; rows++) { HashMap eventData = new HashMap<>(); - eventData.put("name", "Person"+rows); + eventData.put("name", "Person" + rows); eventData.put("age", Integer.toString(rows)); recordList.add((eventData)); diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java index 9f42d5a25e..3409b3a593 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -63,10 +64,11 @@ void test_happy_case(final int numberOfRecords) throws IOException { JsonOutputCodecTest.numberOfRecords = numberOfRecords; JsonOutputCodec jsonOutputCodec = createObjectUnderTest(); outputStream = new ByteArrayOutputStream(); - jsonOutputCodec.start(outputStream, null, null); + OutputCodecContext codecContext = new OutputCodecContext(); + jsonOutputCodec.start(outputStream, null, codecContext); for (int index = 0; index < numberOfRecords; index++) { final Event event = (Event) getRecord(index).getData(); - jsonOutputCodec.writeEvent(event, outputStream, null); + jsonOutputCodec.writeEvent(event, outputStream); } jsonOutputCodec.complete(outputStream); List expectedRecords = generateRecords(numberOfRecords); @@ -84,6 +86,7 @@ void test_happy_case(final int numberOfRecords) throws IOException { } } + @Test void testGetExtension() { JsonOutputCodec jsonOutputCodec = createObjectUnderTest(); diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/NewlineDelimitedOutputCodecTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/NewlineDelimitedOutputCodecTest.java index c75fb068ba..96f8a53a4d 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/NewlineDelimitedOutputCodecTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/NewlineDelimitedOutputCodecTest.java @@ -12,12 +12,12 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,30 +33,29 @@ public class NewlineDelimitedOutputCodecTest { private static int numberOfRecords; private static final String REGEX = "\\r?\\n"; - private static ObjectMapper objectMapper = new ObjectMapper(); + private static final ObjectMapper objectMapper = new ObjectMapper(); private NdjsonOutputCodec createObjectUnderTest() { config = new NdjsonOutputConfig(); - config.setExcludeKeys(Arrays.asList("S3")); return new NdjsonOutputCodec(config); } @ParameterizedTest @ValueSource(ints = {1, 3, 10, 100}) void test_happy_case(final int numberOfRecords) throws IOException { - this.numberOfRecords = numberOfRecords; + NewlineDelimitedOutputCodecTest.numberOfRecords = numberOfRecords; NdjsonOutputCodec ndjsonOutputCodec = createObjectUnderTest(); outputStream = new ByteArrayOutputStream(); - ndjsonOutputCodec.start(outputStream, null, null); + OutputCodecContext codecContext = new OutputCodecContext(); + ndjsonOutputCodec.start(outputStream, null, codecContext); for (int index = 0; index < numberOfRecords; index++) { final Event event = (Event) getRecord(index).getData(); - ndjsonOutputCodec.writeEvent(event, outputStream, null); + ndjsonOutputCodec.writeEvent(event, outputStream); } ndjsonOutputCodec.complete(outputStream); - byte[] byteArray = outputStream.toByteArray(); String jsonString = null; try { - jsonString = new String(byteArray, StandardCharsets.UTF_8); + jsonString = outputStream.toString(StandardCharsets.UTF_8); } catch (Exception e) { e.printStackTrace(); } diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java index 0c94a8a121..7134dc47fc 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java @@ -41,11 +41,12 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodec; -import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodecConfig; import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputCodec; import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputConfig; +import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodec; +import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodecConfig; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey; @@ -67,11 +68,11 @@ import java.io.InputStream; import java.nio.file.Files; import java.nio.file.StandardCopyOption; - import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -90,7 +91,7 @@ @ExtendWith(MockitoExtension.class) class S3SinkServiceIT { - private static final String PATH_PREFIX = UUID.randomUUID().toString() + "/%{yyyy}/%{MM}/%{dd}/"; + private static final String PATH_PREFIX = UUID.randomUUID() + "/%{yyyy}/%{MM}/%{dd}/"; private static final int numberOfRecords = 2; private S3Client s3Client; private String bucketName; @@ -164,7 +165,6 @@ void verify_flushed_object_count_into_s3_bucket() { void configureNewLineCodec() { codec = new NdjsonOutputCodec(ndjsonOutputConfig); - when(ndjsonOutputConfig.getExcludeKeys()).thenReturn(new ArrayList<>()); } @Test @@ -186,7 +186,8 @@ void verify_flushed_records_into_s3_bucketNewLine() { } private S3SinkService createObjectUnderTest() { - return new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, "Tag", pluginMetrics); + OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList()); + return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, pluginMetrics); } private int gets3ObjectCount() { @@ -255,6 +256,7 @@ private static Map generateJson(Set testTags) { jsonObject.put("Tag", testTags.toArray()); return jsonObject; } + private static Record getRecord(int index) { List recordList = generateRecords(numberOfRecords); final Event event = JacksonLog.builder().withData(recordList.get(index)).build(); @@ -276,6 +278,7 @@ private static List generateRecords(int numberOfRecords) { } return recordList; } + @Test void verify_flushed_records_into_s3_bucket_Parquet() throws IOException { configureParquetCodec(); @@ -301,15 +304,16 @@ private void configureParquetCodec() { parquetOutputCodecConfig.setRegion(s3region); parquetOutputCodecConfig.setPathPrefix(PATH_PREFIX); codec = new ParquetOutputCodec(parquetOutputCodecConfig); - when(parquetOutputCodecConfig.getExcludeKeys()).thenReturn(new ArrayList<>()); } + private Collection> getRecordList() { final Collection> recordList = new ArrayList<>(); for (int i = 0; i < numberOfRecords; i++) recordList.add(getRecord(i)); return recordList; } + private static Schema parseSchema() { return SchemaBuilder.record("Person") .fields() @@ -317,6 +321,7 @@ private static Schema parseSchema() { .name("age").type().intType().noDefault() .endRecord(); } + private List> createParquetRecordsList(final InputStream inputStream) throws IOException { final File tempFile = File.createTempFile(FILE_NAME, FILE_SUFFIX); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java index 3d1cfa1f7b..11aa67637d 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java @@ -16,8 +16,9 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.AbstractSink; -import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory; @@ -27,7 +28,6 @@ import software.amazon.awssdk.services.s3.S3Client; import java.util.Collection; -import java.util.Objects; /** * Implementation class of s3-sink plugin. It is responsible for receive the collection of @@ -46,7 +46,7 @@ public class S3Sink extends AbstractSink> { /** * @param pluginSetting dp plugin settings. - * @param s3SinkConfig s3 sink configurations. + * @param s3SinkConfig s3 sink configurations. * @param pluginFactory dp plugin factory. */ @DataPrepperPluginConstructor @@ -70,7 +70,7 @@ public S3Sink(final PluginSetting pluginSetting, bufferFactory = new InMemoryBufferFactory(); } final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier); - s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, pluginMetrics); + s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, OutputCodecContext.fromSinkContext(sinkContext), s3Client, pluginMetrics); } @Override diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index 011ffdda6c..0b56890f8e 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; @@ -58,7 +59,7 @@ public class S3SinkService { private final Counter numberOfRecordsSuccessCounter; private final Counter numberOfRecordsFailedCounter; private final DistributionSummary s3ObjectSizeSummary; - private final String tagsTargetKey; + private final OutputCodecContext codecContext; /** * @param s3SinkConfig s3 sink related configuration. @@ -68,12 +69,12 @@ public class S3SinkService { * @param pluginMetrics metrics. */ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory bufferFactory, - final OutputCodec codec, final S3Client s3Client, final String tagsTargetKey, final PluginMetrics pluginMetrics) { + final OutputCodec codec, final OutputCodecContext codecContext, final S3Client s3Client, final PluginMetrics pluginMetrics) { this.s3SinkConfig = s3SinkConfig; this.bufferFactory = bufferFactory; this.codec = codec; this.s3Client = s3Client; - this.tagsTargetKey = tagsTargetKey; + this.codecContext = codecContext; reentrantLock = new ReentrantLock(); bufferedEventHandles = new LinkedList<>(); @@ -105,17 +106,17 @@ void output(Collection> records) { for (Record record : records) { - if(currentBuffer.getEventCount() == 0) { + if (currentBuffer.getEventCount() == 0) { final Event eventForSchemaAutoGenerate = record.getData(); - codec.start(outputStream,eventForSchemaAutoGenerate , tagsTargetKey); + codec.start(outputStream, eventForSchemaAutoGenerate, codecContext); } final Event event = record.getData(); - codec.writeEvent(event, outputStream, tagsTargetKey); - int count = currentBuffer.getEventCount() +1; + codec.writeEvent(event, outputStream); + int count = currentBuffer.getEventCount() + 1; currentBuffer.setEventCount(count); - if(event.getEventHandle() != null) { + if (event.getEventHandle() != null) { bufferedEventHandles.add(event.getEventHandle()); } if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) { @@ -184,6 +185,7 @@ protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) /** * Generate the s3 object path prefix and object file name. + * * @return object key path. */ protected String generateKey(OutputCodec codec) { diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java index 8ddef20e86..3334a8a6df 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java @@ -19,6 +19,7 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; @@ -79,6 +80,7 @@ class S3SinkServiceTest { private S3SinkConfig s3SinkConfig; private S3Client s3Client; private OutputCodec codec; + private OutputCodecContext codecContext; private PluginMetrics pluginMetrics; private BufferFactory bufferFactory; private Counter snapshotSuccessCounter; @@ -92,6 +94,7 @@ void setUp() { random = new Random(); tagsTargetKey = RandomStringUtils.randomAlphabetic(5); s3SinkConfig = mock(S3SinkConfig.class); + codecContext = new OutputCodecContext(tagsTargetKey, Collections.emptyList(), Collections.emptyList()); s3Client = mock(S3Client.class); ThresholdOptions thresholdOptions = mock(ThresholdOptions.class); ObjectKeyOptions objectKeyOptions = mock(ObjectKeyOptions.class); @@ -135,7 +138,7 @@ void setUp() { } private S3SinkService createObjectUnderTest() { - return new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, tagsTargetKey, pluginMetrics); + return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, pluginMetrics); } @Test @@ -186,7 +189,7 @@ void test_output_with_threshold_set_as_more_then_zero_event_count() throws IOExc when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(5); final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); s3SinkService.output(generateRandomStringEventRecord()); @@ -209,7 +212,7 @@ void test_output_with_threshold_set_as_zero_event_count() throws IOException { when(s3SinkConfig.getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse("2kb")); final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); s3SinkService.output(generateRandomStringEventRecord()); @@ -227,7 +230,7 @@ void test_output_with_uploadedToS3_success() throws IOException { final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); assertThat(s3SinkService, instanceOf(S3SinkService.class)); @@ -247,7 +250,7 @@ void test_output_with_uploadedToS3_success_records_byte_count() throws IOExcepti final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); s3SinkService.output(generateRandomStringEventRecord()); @@ -260,7 +263,7 @@ void test_output_with_uploadedToS3_failed() throws IOException { when(s3SinkConfig.getMaxUploadRetries()).thenReturn(3); final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); assertThat(s3SinkService, instanceOf(S3SinkService.class)); @@ -283,7 +286,7 @@ void test_output_with_uploadedToS3_failure_does_not_record_byte_count() throws I final S3SinkService s3SinkService = createObjectUnderTest(); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); final OutputStream outputStream = mock(OutputStream.class); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); s3SinkService.output(Collections.singletonList(new Record<>(event))); verify(s3ObjectSizeSummary, never()).record(anyLong()); @@ -303,7 +306,7 @@ void test_retryFlushToS3_positive() throws InterruptedException, IOException { assertNotNull(buffer); OutputStream outputStream = buffer.getOutputStream(); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - codec.writeEvent(event, outputStream, null); + codec.writeEvent(event, outputStream); final String s3Key = UUID.randomUUID().toString(); boolean isUploadedToS3 = s3SinkService.retryFlushToS3(buffer, s3Key); assertTrue(isUploadedToS3); @@ -319,7 +322,7 @@ void test_retryFlushToS3_negative() throws InterruptedException, IOException { assertNotNull(s3SinkService); OutputStream outputStream = buffer.getOutputStream(); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - codec.writeEvent(event, outputStream, null); + codec.writeEvent(event, outputStream); final String s3Key = UUID.randomUUID().toString(); doThrow(AwsServiceException.class).when(buffer).flushToS3(eq(s3Client), anyString(), anyString()); boolean isUploadedToS3 = s3SinkService.retryFlushToS3(buffer, s3Key); @@ -338,7 +341,7 @@ void output_will_release_all_handles_since_a_flush() throws IOException { final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); s3SinkService.output(records); @@ -361,7 +364,7 @@ void output_will_skip_releasing_events_without_EventHandle_objects() throws IOEx final OutputStream outputStream = mock(OutputStream.class); final Event event1 = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event1, outputStream, null); + doNothing().when(codec).writeEvent(event1, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); records.stream() @@ -394,7 +397,7 @@ void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOExce final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final List> records = generateEventRecords(1); s3SinkService.output(records); @@ -417,7 +420,7 @@ void output_will_release_only_new_handles_since_a_flush() throws IOException { final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); s3SinkService.output(records); @@ -450,7 +453,7 @@ void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws I final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream, null); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final List> records = generateEventRecords(1); s3SinkService.output(records);