Skip to content

Commit

Permalink
Merge branch 'main' into parquet-sink-codec-pr
Browse files Browse the repository at this point in the history
# Conflicts:
#	data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java
#	data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java
#	data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java
#	data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java
#	data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java
#	data-prepper-plugins/s3-sink/build.gradle
#	data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java
#	data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java
#	data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/Buffer.java
#	data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryBuffer.java
#	data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBuffer.java
  • Loading branch information
umairofficial committed Jul 27, 2023
2 parents 08143bc + a19d71d commit c21d38c
Show file tree
Hide file tree
Showing 369 changed files with 20,192 additions and 2,412 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ subprojects {
}
} else if (details.requested.group == 'log4j' && details.requested.name == 'log4j') {
details.useTarget group: 'org.apache.logging.log4j', name: 'log4j-1.2-api', version: '2.17.1'
} else if (details.requested.group == 'org.xerial.snappy' && details.requested.name == 'snappy-java') {
details.useTarget group: 'org.xerial.snappy', name: 'snappy-java', version: '1.1.10.1'
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies {
testImplementation testLibs.junit.vintage
testImplementation project(':data-prepper-test-common')
testImplementation 'org.skyscreamer:jsonassert:1.5.1'
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation 'commons-io:commons-io:2.13.0'
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ default Boolean evaluateConditional(final String statement, final Event context)
throw new ClassCastException("Unexpected expression return type of " + result.getClass());
}
}

Boolean isValidExpressionStatement(final String statement);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,12 @@ public interface AcknowledgementSet {
* @since 2.2
*/
public boolean release(final EventHandle eventHandle, final boolean result);

/**
* Indicates that the addition of initial set of events to
* the acknowledgement set is completed.
* It is possible that more events are added to the set as the
* initial events are going through the pipeline line.
*/
public void complete();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ public interface OutputCodec {
* this method get called from {@link Sink} to do initial wrapping in {@link OutputStream}
* Implementors should do initial wrapping according to the implementation
*
* @param outputStream outputStream param for wrapping
* @param outputStream outputStream param for wrapping
* @param event Event to auto-generate schema
* @param tagsTargetKey to add tags to the record to create schema
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
*/
void start(OutputStream outputStream) throws IOException;
void start(OutputStream outputStream, Event event, String tagsTargetKey) throws IOException;

/**
* this method get called from {@link Sink} to write event in {@link OutputStream}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
@JsonDeserialize(using = SinkModel.SinkModelDeserializer.class)
public class SinkModel extends PluginModel {

SinkModel(final String pluginName, final List<String> routes, final Map<String, Object> pluginSettings) {
this(pluginName, new SinkInternalJsonModel(routes, pluginSettings));
SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final Map<String, Object> pluginSettings) {
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, pluginSettings));
}

private SinkModel(final String pluginName, final SinkInternalJsonModel sinkInnerModel) {
Expand All @@ -46,18 +46,30 @@ public Collection<String> getRoutes() {
return this.<SinkInternalJsonModel>getInternalJsonModel().routes;
}

/**
* Gets the tags target key associated with this Sink.
*
* @return The tags target key
* @since 2.4
*/
public String getTagsTargetKey() {
return this.<SinkInternalJsonModel>getInternalJsonModel().tagsTargetKey;
}

public static class SinkModelBuilder {

private final PluginModel pluginModel;
private final List<String> routes;
private final String tagsTargetKey;

private SinkModelBuilder(final PluginModel pluginModel) {
this.pluginModel = pluginModel;
this.routes = Collections.emptyList();
this.tagsTargetKey = null;
}

public SinkModel build() {
return new SinkModel(pluginModel.getPluginName(), routes, pluginModel.getPluginSettings());
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, pluginModel.getPluginSettings());
}
}

Expand All @@ -70,21 +82,27 @@ private static class SinkInternalJsonModel extends InternalJsonModel {
@JsonProperty("routes")
private final List<String> routes;

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty("tags_target_key")
private final String tagsTargetKey;

@JsonCreator
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes) {
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey) {
super();
this.routes = routes != null ? routes : new ArrayList<>();
this.tagsTargetKey = tagsTargetKey;
}

private SinkInternalJsonModel(final List<String> routes, final Map<String, Object> pluginSettings) {
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final Map<String, Object> pluginSettings) {
super(pluginSettings);
this.routes = routes != null ? routes : new ArrayList<>();
this.tagsTargetKey = tagsTargetKey;
}
}

static class SinkModelDeserializer extends AbstractPluginModelDeserializer<SinkModel, SinkInternalJsonModel> {
SinkModelDeserializer() {
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null));
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ public Boolean hasTags(final List<String> tagsList) {

@Override
public void addTags(final List<String> newTags) {
tags.addAll(newTags);
if (Objects.nonNull(newTags)) {
tags.addAll(newTags);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -113,6 +115,16 @@ public interface Event extends Serializable {
*/
String formatString(final String format);

/**
* Returns formatted parts of the input string replaced by their values in the event or the values from the result
* of a Data Prepper expression
* @param format input format
* @return returns a string with no formatted parts, returns null if no value is found
* @throws RuntimeException if the input string is not properly formatted
* @since 2.1
*/
String formatString(String format, ExpressionEvaluator expressionEvaluator);

/**
* Returns event handle
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,6 +30,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -298,6 +300,24 @@ public String getAsJsonString(final String key) {
*/
@Override
public String formatString(final String format) {
return formatStringInternal(format, null);
}

/**
* returns a string with formatted parts replaced by their values. The input
* string may contain parts with format "${.../.../...}" which are replaced
* by their value in the event. The input string may also contain Data Prepper expressions
* such as "${getMetadata(\"some_metadata_key\")}
*
* @param format string with format
* @throws RuntimeException if the format is incorrect or the value is not a string
*/
@Override
public String formatString(final String format, final ExpressionEvaluator expressionEvaluator) {
return formatStringInternal(format, expressionEvaluator);
}

private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator) {
int fromIndex = 0;
String result = "";
int position = 0;
Expand All @@ -308,11 +328,21 @@ public String formatString(final String format) {
}
result += format.substring(fromIndex, position);
String name = format.substring(position + 2, endPosition);
Object val = this.get(name, Object.class);
if (val == null) {
throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name));

Object val;
if (Objects.nonNull(expressionEvaluator) && expressionEvaluator.isValidExpressionStatement(name)) {
val = expressionEvaluator.evaluate(name, this);
} else {
val = this.get(name, Object.class);
if (val == null) {
throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name));
}
}


if (Objects.nonNull(val)) {
result += val.toString();
}
result += val.toString();
fromIndex = endPosition + 1;
}
if (fromIndex < format.length()) {
Expand Down Expand Up @@ -373,19 +403,9 @@ private String trimKey(final String key) {
}

private boolean isValidKey(final String key) {
char previous = ' ';
char next = ' ';
for (int i = 0; i < key.length(); i++) {
char c = key.charAt(i);

if (i < key.length() - 1) {
next = key.charAt(i + 1);
}

if ((i == 0 || i == key.length() - 1 || previous == '/' || next == '/') && (c == '_' || c == '.' || c == '-')) {
return false;
}

if (!(c >= 48 && c <= 57
|| c >= 65 && c <= 90
|| c >= 97 && c <= 122
Expand All @@ -397,7 +417,6 @@ private boolean isValidKey(final String key) {

return false;
}
previous = c;
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.model.plugin;

import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.model.configuration.PluginSetting;

import java.util.List;
Expand All @@ -27,6 +28,18 @@ public interface PluginFactory {
*/
<T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting);

/**
* Loads a new instance of a plugin with SinkContext.
*
* @param baseClass The class type that the plugin is supporting.
* @param pluginSetting The {@link PluginSetting} to configure this plugin
* @param sinkContext The {@link SinkContext} to configure this plugin
* @param <T> The type
* @return A new instance of your plugin, configured
* @since 1.2
*/
<T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting, final SinkContext sinkContext);

/**
* Loads a specified number of plugin instances. The total number of instances is provided
* by the numberOfInstancesFunction.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.sink;

import java.util.Collection;

/**
* Data Prepper Sink Context class. This the class for keeping global
* sink configuration as context so that individual sinks may use them.
*/
public class SinkContext {
private final String tagsTargetKey;
private final Collection<String> routes;

public SinkContext(final String tagsTargetKey, final Collection<String> routes) {
this.tagsTargetKey = tagsTargetKey;
this.routes = routes;
}

/**
* returns the target key name for tags if configured for a given sink
* @return tags target key
*/
public String getTagsTargetKey() {
return tagsTargetKey;
}

/**
* returns routes if configured for a given sink
* @return routes
*/
public Collection<String> getRoutes() {
return routes;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class TestExpressionEvaluator implements ExpressionEvaluator {
public Object evaluate(final String statement, final Event event) {
return event.get(statement, Object.class);
}

@Override
public Boolean isValidExpressionStatement(final String statement) {
return true;
}
}

@Test
Expand Down
Loading

0 comments on commit c21d38c

Please sign in to comment.