Skip to content

Commit

Permalink
Add include_keys and exclude_keys to s3 sink
Browse files Browse the repository at this point in the history
Signed-off-by: Aiden Dai <[email protected]>
  • Loading branch information
daixba committed Jul 18, 2023
1 parent 3ea3c6b commit e5e2b35
Show file tree
Hide file tree
Showing 17 changed files with 102 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
package org.opensearch.dataprepper.model.codec;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.sink.SinkContext;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.List;

public interface OutputCodec {

Expand All @@ -25,20 +24,20 @@ public interface OutputCodec {
* Implementors should do initial wrapping according to the implementation
*
* @param outputStream outputStream param for wrapping
* @param sinkContext {@link SinkContext} object
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
*/
void start(OutputStream outputStream) throws IOException;
void start(OutputStream outputStream, SinkContext sinkContext) throws IOException;

/**
* this method get called from {@link Sink} to write event in {@link OutputStream}
* Implementors should do get data from event and write to the {@link OutputStream}
*
* @param event event Record event
* @param outputStream outputStream param to hold the event data
* @param tagsTargetKey to add tags to the record
* @throws IOException throws IOException when not able to write data to {@link OutputStream}
*/
void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException;
void writeEvent(Event event, OutputStream outputStream) throws IOException;

/**
* this method get called from {@link Sink} to do final wrapping in {@link OutputStream}
Expand All @@ -56,10 +55,11 @@ public interface OutputCodec {
*/
String getExtension();

default Event addTagsToEvent(Event event, String tagsTargetKey) throws JsonProcessingException {
String eventJsonString = event.jsonBuilder().includeTags(tagsTargetKey).toJsonString();
Map<String, Object> eventData = objectMapper.readValue(eventJsonString, new TypeReference<>() {
});
return JacksonLog.builder().withData(eventData).build();
default String buildJsonString(Event event, String tagsTargetKey, List<String> includeKeys, List<String> excludeKeys) throws JsonProcessingException {
return event.jsonBuilder()
.includeTags(tagsTargetKey)
.includeKeys(includeKeys)
.excludeKeys(excludeKeys)
.toJsonString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -541,9 +541,11 @@ public String toJsonString() {
jsonString = searchAndFilter(getBaseNode(), "", getIncludeKeys(), RETAIN_ALL);
} else if (getExcludeKeys() != null && !getExcludeKeys().isEmpty()) {
jsonString = searchAndFilter(getBaseNode(), "", getExcludeKeys(), EXCLUDE_ALL);
} else if (getBaseNode() !=event.getJsonNode()) {
jsonString = event.getAsJsonString(getRootKey());
} else {
// Some successors have its own implementation of toJsonString, such as JacksonSpan.
// In such case, the root key will be ignored.
// In such case, it's only used when the root key is not provided.
// TODO: Need to check if such behaviour is expected.
jsonString = event.toJsonString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public SinkContext(String tagsTargetKey, Collection<String> routes, List<String>
this.excludeKeys = excludeKeys;
}

public SinkContext(String tagsTargetKey) {
this(tagsTargetKey, null, null, null);
}

/**
* returns the target key name for tags if configured for a given sink
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.sink.SinkContext;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -18,7 +19,7 @@
import java.util.Set;
import java.util.UUID;

import static org.junit.Assert.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class OutputCodecTest {

Expand All @@ -30,11 +31,11 @@ public void setUp() {
public void testWriteMetrics() throws JsonProcessingException {
OutputCodec outputCodec = new OutputCodec() {
@Override
public void start(OutputStream outputStream) throws IOException {
public void start(OutputStream outputStream, SinkContext sinkContext) throws IOException {
}

@Override
public void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException {
public void writeEvent(Event event, OutputStream outputStream) throws IOException {
}

@Override
Expand All @@ -53,8 +54,8 @@ public String getExtension() {
withTags(testTags).build();
Map<String, Object> json = generateJson();
final JacksonEvent event = JacksonLog.builder().withData(json).withEventMetadata(defaultEventMetadata).build();
Event tagsToEvent = outputCodec.addTagsToEvent(event, "Tag");
assertNotEquals(event.toJsonString(), tagsToEvent.toJsonString());
String jsonString = outputCodec.buildJsonString(event, "tag1", null, null);
assertEquals(event.jsonBuilder().includeTags("tag1").toJsonString(), jsonString);
}

private static Map<String, Object> generateJson() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ void testJsonStringBuilder() {
eventMetadata.addTags(List.of("tag1", "tag2"));
final String expectedJsonString = "{\"foo\":\"bar\",\"tags\":[\"tag1\",\"tag2\"]}";
assertThat(event.jsonBuilder().includeTags("tags").toJsonString(), equalTo(expectedJsonString));
assertThat(event.jsonBuilder().rootKey("foo").toJsonString(), equalTo("\"bar\""));
assertThat(event.jsonBuilder().toJsonString(), equalTo(jsonString));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,16 @@ public void testSinkContextBasic() {

}

@Test
public void testSinkContextWithTagsOnly() {
final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6);
sinkContext = new SinkContext(testTagsTargetKey);
assertThat(sinkContext.getTagsTargetKey(), equalTo(testTagsTargetKey));
assertThat(sinkContext.getRoutes(), equalTo(null));
assertThat(sinkContext.getIncludeKeys(), equalTo(null));
assertThat(sinkContext.getExcludeKeys(), equalTo(null));

}

}

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.sink.SinkContext;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -26,12 +27,12 @@ public AvroOutputCodec(final AvroOutputCodecConfig config) {
}

@Override
public void start(final OutputStream outputStream) throws IOException {
public void start(OutputStream outputStream, SinkContext sinkContext) throws IOException {
// TODO: do the initial wrapping
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream,final String tagsTargetKey) throws IOException {
public void writeEvent(Event event, OutputStream outputStream) throws IOException {
// TODO: write event data to the outputstream
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.sink.SinkContext;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -25,12 +26,12 @@ public CsvOutputCodec(final CsvOutputCodecConfig config) {
}

@Override
public void start(final OutputStream outputStream) throws IOException {
public void start(final OutputStream outputStream, SinkContext sinkContext) throws IOException {
// TODO: do the initial wrapping like get header and delimiter and write to Outputstream
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException {
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
// TODO: validate data according to header and write event data to the outputstream
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.sink.SinkContext;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -25,51 +26,37 @@ public class NewlineDelimitedOutputCodec implements OutputCodec {
private static final ObjectMapper objectMapper = new ObjectMapper();
private final NewlineDelimitedOutputConfig config;

private SinkContext sinkContext;

@DataPrepperPluginConstructor
public NewlineDelimitedOutputCodec(final NewlineDelimitedOutputConfig config) {
Objects.requireNonNull(config);
this.config = config;
}

@Override
public void start(final OutputStream outputStream) throws IOException {
public void start(final OutputStream outputStream, SinkContext sinkContext) throws IOException {
Objects.requireNonNull(outputStream);
Objects.requireNonNull(sinkContext);
this.sinkContext = sinkContext;
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException {
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
Objects.requireNonNull(event);
Map<String, Object> eventMap;
if (tagsTargetKey != null) {
eventMap = addTagsToEvent(event, tagsTargetKey).toMap();
} else {
eventMap = event.toMap();
}
writeToOutputStream(outputStream, eventMap);

String jsonString = buildJsonString(event, sinkContext.getTagsTargetKey(), sinkContext.getIncludeKeys(), sinkContext.getExcludeKeys());
byte[] byteArr = jsonString.getBytes();
outputStream.write(byteArr);
outputStream.write(System.lineSeparator().getBytes());
}

@Override
public void complete(final OutputStream outputStream) throws IOException {
outputStream.close();
}

private void writeToOutputStream(final OutputStream outputStream, final Object object) throws IOException {
byte[] byteArr = null;
if (object instanceof Map) {
Map<Object, Object> map = objectMapper.convertValue(object, Map.class);
for (String key : config.getExcludeKeys()) {
if (map.containsKey(key)) {
map.remove(key);
}
}
String json = objectMapper.writeValueAsString(map);
byteArr = json.getBytes();
} else {
byteArr = object.toString().getBytes();
}
outputStream.write(byteArr);
outputStream.write(System.lineSeparator().getBytes());
}

@Override
public String getExtension() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,16 @@

package org.opensearch.dataprepper.plugins.codec.newline;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.ArrayList;
import java.util.List;

/**
* Configuration class for the newline delimited codec.
*/
public class NewlineDelimitedOutputConfig {
private static final List<String> DEFAULT_EXCLUDE_KEYS = new ArrayList<>();

@JsonProperty("exclude_keys")
private List<String> excludeKeys = DEFAULT_EXCLUDE_KEYS;

public List<String> getExcludeKeys() {
return excludeKeys;
}
// private static final List<String> DEFAULT_EXCLUDE_KEYS = new ArrayList<>();
//
// @JsonProperty("exclude_keys")
// private List<String> excludeKeys = DEFAULT_EXCLUDE_KEYS;
//
// public List<String> getExcludeKeys() {
// return excludeKeys;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.SinkContext;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -40,13 +42,14 @@ private NewlineDelimitedOutputCodec createObjectUnderTest() {
@ParameterizedTest
@ValueSource(ints = {1, 3, 10, 100})
void test_happy_case(final int numberOfRecords) throws IOException {
SinkContext sinkContext = new SinkContext(null, null, Collections.emptyList(), Collections.emptyList());
this.numberOfRecords = numberOfRecords;
NewlineDelimitedOutputCodec newlineDelimitedOutputCodec = createObjectUnderTest();
outputStream = new ByteArrayOutputStream();
newlineDelimitedOutputCodec.start(outputStream);
newlineDelimitedOutputCodec.start(outputStream, sinkContext);
for (int index = 0; index < numberOfRecords; index++) {
final Event event = (Event) getRecord(index).getData();
newlineDelimitedOutputCodec.writeEvent(event, outputStream, null);
newlineDelimitedOutputCodec.writeEvent(event, outputStream);
}
newlineDelimitedOutputCodec.complete(outputStream);
byte[] byteArray = outputStream.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.sink.SinkContext;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -25,7 +26,7 @@ public ParquetOutputCodec(final ParquetOutputCodecConfig config) {


@Override
public void start(final OutputStream outputStream) throws IOException {
public void start(final OutputStream outputStream, SinkContext sinkContext) throws IOException {
// TODO: do the initial wrapping
}

Expand All @@ -35,7 +36,7 @@ public void complete(final OutputStream outputStream) throws IOException {
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException {
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
// TODO: get the event data and write in output stream
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.sink.SinkContext;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -19,12 +20,12 @@
public class JsonOutputCodec implements OutputCodec {

@Override
public void start(final OutputStream outputStream) throws IOException {
public void start(final OutputStream outputStream, SinkContext sinkContext) throws IOException {
// TODO: do the initial wrapping like start the array
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream, String tagsTargetKey) throws IOException {
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
// TODO: get the event data and write event data to the outputstream
}

Expand Down
Loading

0 comments on commit e5e2b35

Please sign in to comment.