Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add include_keys and exclude_keys options to sink #2989

Merged
merged 1 commit into from
Jul 28, 2023

Conversation

daixba
Copy link
Contributor

@daixba daixba commented Jul 7, 2023

Description

Add include_keys and exclude_keys options to sink

Issues Resolved

Resolve #2975

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@dlvenable
Copy link
Member

@daixba , Thank you for this submission. I'd like to discuss some in the original issue about finding a way to solve this with multiple pipelines. That is largely what pipeline connectors are to help solve, and I'm concerned that this approach will grow such that we have two sets of processors - the normal processors and sink processors.

Copy link
Member

@graytaylor0 graytaylor0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall implementation looks really good. Just had a couple questions. Also the build is failing due to some missing code test coverage in data-prepper-api

return new ArrayList<>();
}
List<String> result = keys.stream()
.map(k -> k.startsWith("/") ? k : "/" + k)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to require the json pointer at the start? I think just some_key and /some_key should both work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one-time work during the start to unify all the json paths for check. Customers can put both in the yaml.

break;
} else if (key.startsWith(keyPath)) {
found = true;
valueList.add("\"" + entry.getKey() + "\":" + searchAndFilter(entry.getValue(), keyPath, filterKeys, filterAction));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason to prefer the recursive or non-recursive stack based implementation of DFS here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-recursive requires an extra use of stack in memory but it's not a problem. I think both ways have the same Os, it's just recursive is more easier to read and implement, especially for array nodes. Let me know if any concerns here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The depth of these events will probably not be deep enough to make stack overflow an issue. I think this is acceptable.

* @throws IOException throws IOException when invalid input is received or not able to create wrapping
*/
void start(OutputStream outputStream) throws IOException;
void start(OutputStream outputStream, SinkContext sinkContext) throws IOException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may make sense to pass a different object here rather than SinkContext. For one, the sink really shouldn't need to know anything about routes. For another, this could vary independently of the SinkContext.

Perhaps:

interface OutputCodecOptions {
  List<String> getIncludeKeys();
  ...

Then you can even create an internal adapter to adapt from SinkContext to OutputCodecOptions. Or just do a mapping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks David.

As the OutputCodec are loaded via plugin factory and may have its own configuration options (via pluginConfiguration), if we name it something like OutputCodecOptions, it seems to me it's some configuration options belongs to the OutputCodec, in such case, the usage will be

- sink
     -  s3
         codec:
              xxxx: 
                     include_keys: [...]

For in our case, the usage as below.

- sink
     -  s3
         codec:
              xxxx: 
         include_keys: [...]
         ...

So I am not sure if OutputCodecOptions is the right way. Also, another thing I am not sure is whether or not we are going to throw more info to the SinkContext. So to me, Context seems to be more suitable than Options here.

Thoughts?

Map<String, Object> eventData = objectMapper.readValue(eventJsonString, new TypeReference<>() {
});
return JacksonLog.builder().withData(eventData).build();
default String buildJsonString(Event event, String tagsTargetKey, List<String> includeKeys, List<String> excludeKeys) throws JsonProcessingException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should have this method here. Only the JSON codec cares about building a JSON string. So each codec should handle this independently.

super(pluginSettings);
this.routes = routes != null ? routes : new ArrayList<>();
this.includeKeys = includeKeys != null ? preprocessingKeys(includeKeys) : new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Instead of new ArrayList<>() you could provide Collections.emptyList().

}

private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final Map<String, Object> pluginSettings) {
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the customer provide both include and exclude? It seems that it should one or the other.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes no sense to have both. Although right now, we do allow customers to add both options, but in the code, if include_keys is provided, exclude_keys will be ignored. We can make this clear in the doc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@daixba , Can we have a validation on the code as well? It seems right now the user can add both and the results may not be what they expect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add validation in the next PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class has been renamed. Please see #3030. You should probably rebase.

public List<String> getExcludeKeys() {
return excludeKeys;
}
// private static final List<String> DEFAULT_EXCLUDE_KEYS = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not leave commented code. Just delete it.

}

private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final Map<String, Object> pluginSettings) {
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@daixba , Can we have a validation on the code as well? It seems right now the user can add both and the results may not be what they expect.

@@ -46,6 +48,15 @@ public Collection<String> getRoutes() {
return this.<SinkInternalJsonModel>getInternalJsonModel().routes;
}

public List<String> getIncludeKeys() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a Collection<String> since the order doesn't matter to users. It should help add clarity when authors create new sinks.

public Collection<String> getIncludeKeys()

Copy link
Contributor Author

@daixba daixba Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As matter of fact, the keys are sorted in the code, we do care about order, this is to improve performance in the jsonbuilder.

return this.<SinkInternalJsonModel>getInternalJsonModel().includeKeys;
}

public List<String> getExcludeKeys() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the comment above:

public Collection<String> getExcludeKeys()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

@@ -135,9 +143,15 @@ public interface Event extends Serializable {

JsonStringBuilder jsonBuilder();

public abstract class JsonStringBuilder {
abstract class JsonStringBuilder {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making this improvement - inner classes on interfaces are automatically public, so no keyword is necessary.

.toJsonString();

if (document == null || !document.startsWith("{")) {
return String.format("{\"data\": %s}", document);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You didn't introduce this, but String.format is slow. String concatenation would be faster.

Feel free to update that, but since you didn't introduce it, no need to make the change to accept the PR.

@@ -50,7 +50,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an

final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to update these tests to Collections.emptyCollection() after making the change I suggested about the collection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, it's a list.

break;
} else if (key.startsWith(keyPath)) {
found = true;
valueList.add("\"" + entry.getKey() + "\":" + searchAndFilter(entry.getValue(), keyPath, filterKeys, filterAction));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The depth of these events will probably not be deep enough to make stack overflow an issue. I think this is acceptable.

found = true;
// To keep the order.
if (filterAction == RETAIN_ALL) {
valueList.add("\"" + entry.getKey() + "\":" + entry.getValue().toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that entry.gevValue().toString() provides the right value here? What if the string has double quotes in it? Are they being escaped? Any other special characters?

Copy link
Contributor Author

@daixba daixba Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, entry.getValue().toString() is the right value here. All the special characters are already taken care of by this. The output could be a number (e.g. 10), a character with double quotes (e.g. "ye\"llow"), or even the full json tree (e.g. ["msg 1","msg 2","msg 3"], {"name":"hello","age":"world"} etc.)

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvements can come in follow-on PR.

@kkondaka kkondaka merged commit b519a82 into opensearch-project:main Jul 28, 2023
44 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add include_keys and exclude_keys options under the sink
4 participants