Skip to content

Commit

Permalink
-Support for Sink Codecs (#3030)
Browse files Browse the repository at this point in the history
* -Support for Sink Codecs
Signed-off-by: omkarmmore95 <[email protected]>

* -Support for Sink Codecs
Signed-off-by: omkarmmore95 <[email protected]>

* -Support for Sink Codecs
Signed-off-by: omkarmmore95 <[email protected]>
  • Loading branch information
omkarmmore95 committed Jul 17, 2023
1 parent 2476813 commit 37d05bb
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.newline;
package org.opensearch.dataprepper.plugins.codec.json;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand All @@ -19,14 +19,14 @@
* An implementation of {@link OutputCodec} which deserializes Data-Prepper events
* and writes them to Output Stream as ND-JSON data
*/
@DataPrepperPlugin(name = "newline", pluginType = OutputCodec.class, pluginConfigurationType = NewlineDelimitedOutputConfig.class)
public class NewlineDelimitedOutputCodec implements OutputCodec {
@DataPrepperPlugin(name = "ndjson", pluginType = OutputCodec.class, pluginConfigurationType = NdjsonOutputConfig.class)
public class NdjsonOutputCodec implements OutputCodec {
private static final String NDJSON = "ndjson";
private static final ObjectMapper objectMapper = new ObjectMapper();
private final NewlineDelimitedOutputConfig config;
private final NdjsonOutputConfig config;

@DataPrepperPluginConstructor
public NewlineDelimitedOutputCodec(final NewlineDelimitedOutputConfig config) {
public NdjsonOutputCodec(final NdjsonOutputConfig config) {
Objects.requireNonNull(config);
this.config = config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.codec.newline;
package org.opensearch.dataprepper.plugins.codec.json;

import com.fasterxml.jackson.annotation.JsonProperty;

Expand All @@ -13,7 +13,7 @@
/**
* Configuration class for the newline delimited codec.
*/
public class NewlineDelimitedOutputConfig {
public class NdjsonOutputConfig {
private static final List<String> DEFAULT_EXCLUDE_KEYS = new ArrayList<>();

@JsonProperty("exclude_keys")
Expand All @@ -22,4 +22,8 @@ public class NewlineDelimitedOutputConfig {
public List<String> getExcludeKeys() {
return excludeKeys;
}

public void setExcludeKeys(List<String> excludeKeys) {
this.excludeKeys = excludeKeys;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.json;

import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;


public class JsonOutputCodecTest {

private JsonOutputCodec createObjectUnderTest() {
return new JsonOutputCodec();
}

@Test
void testGetExtension() {
JsonOutputCodec jsonOutputCodec = createObjectUnderTest();
assertThat(null, equalTo(jsonOutputCodec.getExtension()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.newline;
package org.opensearch.dataprepper.plugins.codec.json;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.model.event.Event;
Expand All @@ -16,39 +17,42 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;


public class NewlineDelimitedOutputCodecTest {
private ByteArrayOutputStream outputStream;

private static NewlineDelimitedOutputConfig config;
private static NdjsonOutputConfig config;

private static int numberOfRecords;
private static final String REGEX = "\\r?\\n";
private static ObjectMapper objectMapper = new ObjectMapper();

private NewlineDelimitedOutputCodec createObjectUnderTest() {
config = new NewlineDelimitedOutputConfig();
return new NewlineDelimitedOutputCodec(config);
private NdjsonOutputCodec createObjectUnderTest() {
config = new NdjsonOutputConfig();
config.setExcludeKeys(Arrays.asList("S3"));
return new NdjsonOutputCodec(config);
}

@ParameterizedTest
@ValueSource(ints = {1, 3, 10, 100})
void test_happy_case(final int numberOfRecords) throws IOException {
this.numberOfRecords = numberOfRecords;
NewlineDelimitedOutputCodec newlineDelimitedOutputCodec = createObjectUnderTest();
NdjsonOutputCodec ndjsonOutputCodec = createObjectUnderTest();
outputStream = new ByteArrayOutputStream();
newlineDelimitedOutputCodec.start(outputStream);
ndjsonOutputCodec.start(outputStream);
for (int index = 0; index < numberOfRecords; index++) {
final Event event = (Event) getRecord(index).getData();
newlineDelimitedOutputCodec.writeEvent(event, outputStream, null);
ndjsonOutputCodec.writeEvent(event, outputStream, null);
}
newlineDelimitedOutputCodec.complete(outputStream);
ndjsonOutputCodec.complete(outputStream);
byte[] byteArray = outputStream.toByteArray();
String jsonString = null;
try {
Expand Down Expand Up @@ -84,4 +88,11 @@ private static List<HashMap> generateRecords(int numberOfRecords) {
}
return recordList;
}

@Test
void testGetExtension() {
NdjsonOutputCodec ndjsonOutputCodec = createObjectUnderTest();

assertThat("ndjson", equalTo(ndjsonOutputCodec.getExtension()));
}
}
2 changes: 1 addition & 1 deletion data-prepper-plugins/s3-sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dependencies {
implementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation project(':data-prepper-test-common')

implementation project(':data-prepper-plugins:newline-codecs')
implementation project(':data-prepper-plugins:parse-json-processor')
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.newline.NewlineDelimitedOutputCodec;
import org.opensearch.dataprepper.plugins.codec.newline.NewlineDelimitedOutputConfig;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputCodec;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputConfig;
import org.opensearch.dataprepper.plugins.sink.accumulator.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.accumulator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.sink.accumulator.ObjectKey;
Expand Down Expand Up @@ -90,7 +90,7 @@ class S3SinkServiceIT {
private OutputCodec codec;

@Mock
NewlineDelimitedOutputConfig newlineDelimitedOutputConfig;
NdjsonOutputConfig ndjsonOutputConfig;


@BeforeEach
Expand Down Expand Up @@ -131,8 +131,8 @@ void verify_flushed_object_count_into_s3_bucket() {
}

void configureNewLineCodec() {
codec = new NewlineDelimitedOutputCodec(newlineDelimitedOutputConfig);
when(newlineDelimitedOutputConfig.getExcludeKeys()).thenReturn(new ArrayList<>());
codec = new NdjsonOutputCodec(ndjsonOutputConfig);
when(ndjsonOutputConfig.getExcludeKeys()).thenReturn(new ArrayList<>());
}

@Test
Expand Down

0 comments on commit 37d05bb

Please sign in to comment.