Skip to content

Commit

Permalink
Add include_keys and exclude_keys options to sink
Browse files Browse the repository at this point in the history
Signed-off-by: Aiden Dai <[email protected]>
  • Loading branch information
daixba committed Jul 17, 2023
1 parent 0b804fd commit 893cd2e
Show file tree
Hide file tree
Showing 10 changed files with 365 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Represents an extension of the {@link PluginModel} which is specific to Sink
Expand All @@ -28,8 +30,8 @@
@JsonDeserialize(using = SinkModel.SinkModelDeserializer.class)
public class SinkModel extends PluginModel {

SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final Map<String, Object> pluginSettings) {
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, pluginSettings));
SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, pluginSettings));
}

private SinkModel(final String pluginName, final SinkInternalJsonModel sinkInnerModel) {
Expand All @@ -46,6 +48,15 @@ public Collection<String> getRoutes() {
return this.<SinkInternalJsonModel>getInternalJsonModel().routes;
}

public List<String> getIncludeKeys() {
return this.<SinkInternalJsonModel>getInternalJsonModel().includeKeys;
}

public List<String> getExcludeKeys() {
return this.<SinkInternalJsonModel>getInternalJsonModel().excludeKeys;
}


/**
* Gets the tags target key associated with this Sink.
*
Expand All @@ -62,14 +73,19 @@ public static class SinkModelBuilder {
private final List<String> routes;
private final String tagsTargetKey;

private final List<String> includeKeys;
private final List<String> excludeKeys;

private SinkModelBuilder(final PluginModel pluginModel) {
this.pluginModel = pluginModel;
this.routes = Collections.emptyList();
this.tagsTargetKey = null;
this.includeKeys = Collections.emptyList();
this.excludeKeys = Collections.emptyList();
}

public SinkModel build() {
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, pluginModel.getPluginSettings());
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, pluginModel.getPluginSettings());
}
}

Expand All @@ -86,23 +102,53 @@ private static class SinkInternalJsonModel extends InternalJsonModel {
@JsonProperty("tags_target_key")
private final String tagsTargetKey;


@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty("include_keys")
private final List<String> includeKeys;

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty("exclude_keys")
private final List<String> excludeKeys;

@JsonCreator
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey) {
super();
this.routes = routes != null ? routes : new ArrayList<>();
this.tagsTargetKey = tagsTargetKey;
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List<String> includeKeys, @JsonProperty("exclude_keys") final List<String> excludeKeys) {
this(routes, tagsTargetKey, includeKeys, excludeKeys, new HashMap<>());
}

private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final Map<String, Object> pluginSettings) {
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
super(pluginSettings);
this.routes = routes != null ? routes : new ArrayList<>();
this.includeKeys = includeKeys != null ? preprocessingKeys(includeKeys) : new ArrayList<>();
this.excludeKeys = excludeKeys != null ? preprocessingKeys(excludeKeys) : new ArrayList<>();
this.tagsTargetKey = tagsTargetKey;
}


/**
* Pre-processes a list of Keys and returns a sorted list
* The keys must start with `/` and not end with `/`
*
* @param keys a list of raw keys
* @return a sorted processed keys
*/
private List<String> preprocessingKeys(final List<String> keys) {
if (keys.contains("/")) {
return new ArrayList<>();
}
List<String> result = keys.stream()
.map(k -> k.startsWith("/") ? k : "/" + k)
.map(k -> k.endsWith("/") ? k.substring(0, k.length() - 1) : k)
.collect(Collectors.toList());
Collections.sort(result);
return result;
}
}


static class SinkModelDeserializer extends AbstractPluginModelDeserializer<SinkModel, SinkInternalJsonModel> {
SinkModelDeserializer() {
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null));
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null, null, null));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
* {@link org.opensearch.dataprepper.model.sink.Sink} and {@link org.opensearch.dataprepper.model.source.Source} will be extended to support
* the new internal model. The use of {@link org.opensearch.dataprepper.model.record.Record}s will be deprecated in 2.0.
* <p>
*
* @since 1.2
*/
public interface Event extends Serializable {

/**
* Adds or updates the key with a given value in the Event
*
* @param key where the value will be set
* @param key where the value will be set
* @param value value to set the key to
* @since 1.2
*/
Expand All @@ -34,9 +35,9 @@ public interface Event extends Serializable {
/**
* Retrieves the given key from the Event
*
* @param key the value to retrieve from
* @param key the value to retrieve from
* @param clazz the return type of the value
* @param <T> The type
* @param <T> The type
* @return T a clazz object from the key
* @since 1.2
*/
Expand All @@ -45,30 +46,33 @@ public interface Event extends Serializable {
/**
* Retrieves the given key from the Event as a List
*
* @param key the value to retrieve from
* @param key the value to retrieve from
* @param clazz the return type of elements in the list
* @param <T> The type
* @param <T> The type
* @return {@literal List<T>} a list of clazz elements
* @since 1.2
*/
<T> List<T> getList(String key, Class<T> clazz);

/**
* Deletes the given key from the Event
*
* @param key the field to be deleted
* @since 1.2
*/
void delete(String key);

/**
* Generates a serialized Json string of the entire Event
*
* @return Json string of the event
* @since 1.2
*/
String toJsonString();

/**
* Gets a serialized Json string of the specific key in the Event
*
* @param key the field to be returned
* @return Json string of the field
* @since 2.2
Expand All @@ -77,13 +81,15 @@ public interface Event extends Serializable {

/**
* Retrieves the EventMetadata
*
* @return EventMetadata for the event
* @since 1.2
*/
EventMetadata getMetadata();

/**
* Checks if the key exists.
*
* @param key name of the key to look for
* @return returns true if the key exists, otherwise false
* @since 1.2
Expand All @@ -92,6 +98,7 @@ public interface Event extends Serializable {

/**
* Checks if the value stored for the key is list
*
* @param key name of the key to look for
* @return returns true if the key is a list, otherwise false
* @since 1.2
Expand All @@ -106,6 +113,7 @@ public interface Event extends Serializable {

/**
* Returns formatted parts of the input string replaced by their values in the event
*
* @param format input format
* @return returns a string with no formatted parts, returns null if no value is found
* @throws RuntimeException if the input string is not properly formatted
Expand All @@ -123,9 +131,15 @@ public interface Event extends Serializable {

JsonStringBuilder jsonBuilder();

public abstract class JsonStringBuilder {
abstract class JsonStringBuilder {
private String tagsKey;

private String rootKey;

private List<String> includeKeys;

private List<String> excludeKeys;

/**
* @param key key to be used for tags
* @return JsonStringString with tags included
Expand All @@ -136,6 +150,36 @@ public JsonStringBuilder includeTags(String key) {
return this;
}

/**
* @param rootKey key to be used for tags
* @return JsonStringString with tags included
* @since 2.4
*/
public JsonStringBuilder rootKey(String rootKey) {
this.rootKey = rootKey;
return this;
}

/**
* @param includeKeys A list of keys to be retained
* @return JsonStringString with retained keys only
* @since 2.4
*/
public JsonStringBuilder includeKeys(List<String> includeKeys) {
this.includeKeys = includeKeys;
return this;
}

/**
* @param excludeKeys A list of keys to be excluded
* @return JsonStringString without excluded keys
* @since 2.4
*/
public JsonStringBuilder excludeKeys(List<String> excludeKeys) {
this.excludeKeys = excludeKeys;
return this;
}

/**
* @return key used for tags
* @since 2.3
Expand All @@ -144,6 +188,30 @@ public String getTagsKey() {
return tagsKey;
}

/**
* @return root key
* @since 2.4
*/
public String getRootKey() {
return rootKey;
}

/**
* @return a list of keys to be retrained.
* @since 2.4
*/
public List<String> getIncludeKeys() {
return includeKeys;
}

/**
* @return a list of keys to be excluded
* @since 2.4
*/
public List<String> getExcludeKeys() {
return excludeKeys;
}

/**
* @return json string
* @since 2.3
Expand Down
Loading

0 comments on commit 893cd2e

Please sign in to comment.