Skip to content

Commit

Permalink
-Support for Sink Codecs
Browse files Browse the repository at this point in the history
Signed-off-by: omkarmmore95 <[email protected]>
  • Loading branch information
omkarmmore95 committed Jul 4, 2023
1 parent 5095fef commit 47a7b72
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@

package org.opensearch.dataprepper.model.codec;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.sink.Sink;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;

public interface OutputCodec {

static final ObjectMapper objectMapper = new ObjectMapper();

/**
* this method get called from {@link Sink} to do initial wrapping in {@link OutputStream}
* Implementors should do initial wrapping according to the implementation
Expand All @@ -26,11 +33,12 @@ public interface OutputCodec {
* this method get called from {@link Sink} to write event in {@link OutputStream}
* Implementors should do get data from event and write to the {@link OutputStream}
*
* @param event event Record event
* @param outputStream outputStream param to hold the event data
* @param event event Record event
* @param outputStream outputStream param to hold the event data
* @param tagsTargetKey to add tags to the record
* @throws IOException throws IOException when not able to write data to {@link OutputStream}
*/
void writeEvent(Event event, OutputStream outputStream) throws IOException;
void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException;

/**
* this method get called from {@link Sink} to do final wrapping in {@link OutputStream}
Expand All @@ -47,4 +55,11 @@ public interface OutputCodec {
* @return String
*/
String getExtension();

default Event addTagsToEvent(Event event, String tagsTargetKey) throws JsonProcessingException {
String eventJsonString = event.jsonBuilder().includeTags(tagsTargetKey).toJsonString();
Map<String, Object> eventData = objectMapper.readValue(eventJsonString, new TypeReference<>() {
});
return JacksonLog.builder().withData(eventData).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void start(final OutputStream outputStream) throws IOException {
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
public void writeEvent(final Event event, final OutputStream outputStream,final String tagsTargetKey) throws IOException {
// TODO: write event data to the outputstream
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void start(final OutputStream outputStream) throws IOException {
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException {
// TODO: validate data according to header and write event data to the outputstream
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
@DataPrepperPlugin(name = "newline", pluginType = OutputCodec.class, pluginConfigurationType = NewlineDelimitedOutputConfig.class)
public class NewlineDelimitedOutputCodec implements OutputCodec {
private static final String NDJSON = "ndjson";
private static final String MESSAGE_FIELD_NAME = "message";
private static final ObjectMapper objectMapper = new ObjectMapper();
private final NewlineDelimitedOutputConfig config;

Expand All @@ -38,9 +37,15 @@ public void start(final OutputStream outputStream) throws IOException {
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException {
Objects.requireNonNull(event);
writeToOutputStream(outputStream, event.toMap());
Map<String, Object> eventMap;
if (tagsTargetKey != null) {
eventMap = addTagsToEvent(event, tagsTargetKey).toMap();
} else {
eventMap = event.toMap();
}
writeToOutputStream(outputStream, eventMap);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void test_happy_case(final int numberOfRecords) throws IOException {
newlineDelimitedOutputCodec.start(outputStream);
for (int index = 0; index < numberOfRecords; index++) {
final Event event = (Event) getRecord(index).getData();
newlineDelimitedOutputCodec.writeEvent(event, outputStream);
newlineDelimitedOutputCodec.writeEvent(event, outputStream, null);
}
newlineDelimitedOutputCodec.complete(outputStream);
byte[] byteArray = outputStream.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void complete(final OutputStream outputStream) throws IOException {
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException {
// TODO: get the event data and write in output stream
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void start(final OutputStream outputStream) throws IOException {
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException {
// TODO: get the event data and write event data to the outputstream
}

Expand Down
2 changes: 2 additions & 0 deletions data-prepper-plugins/s3-sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ dependencies {
implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.8.21'
implementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation project(':data-prepper-test-common')

implementation project(':data-prepper-plugins:newline-codecs')
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.newline.NewlineDelimitedOutputCodec;
import org.opensearch.dataprepper.plugins.codec.newline.NewlineDelimitedOutputConfig;
import org.opensearch.dataprepper.plugins.sink.accumulator.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.accumulator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.sink.accumulator.ObjectKey;
Expand All @@ -37,12 +42,14 @@
import software.amazon.awssdk.services.s3.model.S3Object;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -80,9 +87,11 @@ class S3SinkServiceIT {
@Mock
private DistributionSummary s3ObjectSizeSummary;

@Mock
private OutputCodec codec;

@Mock
NewlineDelimitedOutputConfig newlineDelimitedOutputConfig;


@BeforeEach
public void setUp() {
Expand Down Expand Up @@ -113,30 +122,39 @@ public void setUp() {

@Test
void verify_flushed_object_count_into_s3_bucket() {
configureNewLineCodec();
int s3ObjectCountBeforeIngest = gets3ObjectCount();
S3SinkService s3SinkService = createObjectUnderTest();
s3SinkService.output(setEventQueue());
int s3ObjectCountAfterIngest = gets3ObjectCount();
assertThat(s3ObjectCountAfterIngest, equalTo(s3ObjectCountBeforeIngest + 1));
}

@Test
void verify_flushed_records_into_s3_bucket() {
void configureNewLineCodec() {
codec = new NewlineDelimitedOutputCodec(newlineDelimitedOutputConfig);
when(newlineDelimitedOutputConfig.getExcludeKeys()).thenReturn(new ArrayList<>());
}

@Test
void verify_flushed_records_into_s3_bucketNewLine() {
configureNewLineCodec();
S3SinkService s3SinkService = createObjectUnderTest();
Collection<Record<Event>> recordsData = setEventQueue();

s3SinkService.output(recordsData);
String objectData = getS3Object();

int count = 0;
String[] objectDataArr = objectData.split("\r\n");
for (Record<Event> recordData : recordsData) {
String objectRecord = recordData.getData().toJsonString();
assertThat(objectData, CoreMatchers.containsString(objectRecord));
assertThat(objectDataArr[count], CoreMatchers.containsString(objectRecord));
count++;
}
}

private S3SinkService createObjectUnderTest() {
return new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, null, pluginMetrics);
return new S3SinkService(s3SinkConfig, bufferFactory, codec, s3Client, "Tag", pluginMetrics);
}

private int gets3ObjectCount() {
Expand Down Expand Up @@ -185,19 +203,24 @@ private Collection<Record<Event>> setEventQueue() {
}

private static Record<Event> createRecord() {
Map<String, Object> json = generateJson();
final JacksonEvent event = JacksonLog.builder().withData(json).build();
final Set<String> testTags = Set.of("tag1");
final EventMetadata defaultEventMetadata = DefaultEventMetadata.builder().
withEventType(EventType.LOG.toString()).
withTags(testTags).build();
Map<String, Object> json = generateJson(testTags);
final JacksonEvent event = JacksonLog.builder().withData(json).withEventMetadata(defaultEventMetadata).build();
event.setEventHandle(mock(EventHandle.class));
return new Record<>(event);
}

private static Map<String, Object> generateJson() {
private static Map<String, Object> generateJson(Set<String> testTags) {
final Map<String, Object> jsonObject = new LinkedHashMap<>();
for (int i = 0; i < 2; i++) {
jsonObject.put(UUID.randomUUID().toString(), UUID.randomUUID().toString());
}
jsonObject.put(UUID.randomUUID().toString(), Arrays.asList(UUID.randomUUID().toString(),
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
jsonObject.put("Tag", testTags.toArray());
return jsonObject;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class S3SinkService {
* @param pluginMetrics metrics.
*/
public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory bufferFactory,
final Codec codec, final S3Client s3Client, final String tagsTargetKey, final PluginMetrics pluginMetrics) {
final OutputCodec codec, final S3Client s3Client, final String tagsTargetKey, final PluginMetrics pluginMetrics) {
this.s3SinkConfig = s3SinkConfig;
this.bufferFactory = bufferFactory;
this.codec = codec;
Expand Down Expand Up @@ -101,19 +101,25 @@ void output(Collection<Record<Event>> records) {
currentBuffer = bufferFactory.getBuffer();
}
try {
OutputStream outputStream = currentBuffer.getOutputStream();

for (Record<Event> record : records) {

if(currentBuffer.getEventCount() == 0) {
codec.start(outputStream);
}

final Event event = record.getData();
final String encodedEvent;
encodedEvent = codec.parse(event, tagsTargetKey);
final byte[] encodedBytes = encodedEvent.getBytes();
codec.writeEvent(event, outputStream, tagsTargetKey);
int count = currentBuffer.getEventCount() +1;
currentBuffer.setEventCount(count);

currentBuffer.writeEvent(encodedBytes);
if(event.getEventHandle() != null) {
bufferedEventHandles.add(event.getEventHandle());
}
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
final String s3Key = generateKey();
codec.complete(outputStream);
final String s3Key = generateKey(codec);
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
s3Key, currentBuffer.getEventCount(), currentBuffer.getSize());
final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key);
Expand Down
Loading

0 comments on commit 47a7b72

Please sign in to comment.