-
Notifications
You must be signed in to change notification settings - Fork 190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add include_keys and exclude_keys options to sink #2989
Conversation
@daixba , Thank you for this submission. I'd like to discuss some in the original issue about finding a way to solve this with multiple pipelines. That is largely what pipeline connectors are to help solve, and I'm concerned that this approach will grow such that we have two sets of processors - the normal processors and sink processors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall implementation looks really good. Just had a couple questions. Also the build is failing due to some missing code test coverage in data-prepper-api
return new ArrayList<>(); | ||
} | ||
List<String> result = keys.stream() | ||
.map(k -> k.startsWith("/") ? k : "/" + k) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to require the json pointer at the start? I think just some_key
and /some_key
should both work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one-time work during the start to unify all the json paths for check. Customers can put both in the yaml.
break; | ||
} else if (key.startsWith(keyPath)) { | ||
found = true; | ||
valueList.add("\"" + entry.getKey() + "\":" + searchAndFilter(entry.getValue(), keyPath, filterKeys, filterAction)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason to prefer the recursive or non-recursive stack based implementation of DFS here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-recursive requires an extra use of stack in memory but it's not a problem. I think both ways have the same Os, it's just recursive is more easier to read and implement, especially for array nodes. Let me know if any concerns here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The depth of these events will probably not be deep enough to make stack overflow an issue. I think this is acceptable.
* @throws IOException throws IOException when invalid input is received or not able to create wrapping | ||
*/ | ||
void start(OutputStream outputStream) throws IOException; | ||
void start(OutputStream outputStream, SinkContext sinkContext) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may make sense to pass a different object here rather than SinkContext
. For one, the sink really shouldn't need to know anything about routes. For another, this could vary independently of the SinkContext
.
Perhaps:
interface OutputCodecOptions {
List<String> getIncludeKeys();
...
Then you can even create an internal adapter to adapt from SinkContext
to OutputCodecOptions
. Or just do a mapping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks David.
As the OutputCodec are loaded via plugin factory and may have its own configuration options (via pluginConfiguration), if we name it something like OutputCodecOptions, it seems to me it's some configuration options belongs to the OutputCodec, in such case, the usage will be
- sink
- s3
codec:
xxxx:
include_keys: [...]
For in our case, the usage as below.
- sink
- s3
codec:
xxxx:
include_keys: [...]
...
So I am not sure if OutputCodecOptions is the right way. Also, another thing I am not sure is whether or not we are going to throw more info to the SinkContext. So to me, Context
seems to be more suitable than Options
here.
Thoughts?
Map<String, Object> eventData = objectMapper.readValue(eventJsonString, new TypeReference<>() { | ||
}); | ||
return JacksonLog.builder().withData(eventData).build(); | ||
default String buildJsonString(Event event, String tagsTargetKey, List<String> includeKeys, List<String> excludeKeys) throws JsonProcessingException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should have this method here. Only the JSON codec cares about building a JSON string. So each codec should handle this independently.
super(pluginSettings); | ||
this.routes = routes != null ? routes : new ArrayList<>(); | ||
this.includeKeys = includeKeys != null ? preprocessingKeys(includeKeys) : new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
Instead of new ArrayList<>()
you could provide Collections.emptyList()
.
} | ||
|
||
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final Map<String, Object> pluginSettings) { | ||
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the customer provide both include and exclude? It seems that it should one or the other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes no sense to have both. Although right now, we do allow customers to add both options, but in the code, if include_keys is provided, exclude_keys will be ignored. We can make this clear in the doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@daixba , Can we have a validation on the code as well? It seems right now the user can add both and the results may not be what they expect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will add validation in the next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class has been renamed. Please see #3030. You should probably rebase.
public List<String> getExcludeKeys() { | ||
return excludeKeys; | ||
} | ||
// private static final List<String> DEFAULT_EXCLUDE_KEYS = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not leave commented code. Just delete it.
Signed-off-by: Aiden Dai <[email protected]>
} | ||
|
||
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final Map<String, Object> pluginSettings) { | ||
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@daixba , Can we have a validation on the code as well? It seems right now the user can add both and the results may not be what they expect.
@@ -46,6 +48,15 @@ public Collection<String> getRoutes() { | |||
return this.<SinkInternalJsonModel>getInternalJsonModel().routes; | |||
} | |||
|
|||
public List<String> getIncludeKeys() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a Collection<String>
since the order doesn't matter to users. It should help add clarity when authors create new sinks.
public Collection<String> getIncludeKeys()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As matter of fact, the keys are sorted in the code, we do care about order, this is to improve performance in the jsonbuilder.
return this.<SinkInternalJsonModel>getInternalJsonModel().includeKeys; | ||
} | ||
|
||
public List<String> getExcludeKeys() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the comment above:
public Collection<String> getExcludeKeys()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
@@ -135,9 +143,15 @@ public interface Event extends Serializable { | |||
|
|||
JsonStringBuilder jsonBuilder(); | |||
|
|||
public abstract class JsonStringBuilder { | |||
abstract class JsonStringBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making this improvement - inner classes on interfaces are automatically public, so no keyword is necessary.
.toJsonString(); | ||
|
||
if (document == null || !document.startsWith("{")) { | ||
return String.format("{\"data\": %s}", document); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You didn't introduce this, but String.format
is slow. String concatenation would be faster.
Feel free to update that, but since you didn't introduce it, no need to make the change to accept the PR.
@@ -50,7 +50,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an | |||
|
|||
final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null); | |||
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", (Map<String, Object>) null)); | |||
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, null)); | |||
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to update these tests to Collections.emptyCollection()
after making the change I suggested about the collection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, it's a list.
break; | ||
} else if (key.startsWith(keyPath)) { | ||
found = true; | ||
valueList.add("\"" + entry.getKey() + "\":" + searchAndFilter(entry.getValue(), keyPath, filterKeys, filterAction)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The depth of these events will probably not be deep enough to make stack overflow an issue. I think this is acceptable.
found = true; | ||
// To keep the order. | ||
if (filterAction == RETAIN_ALL) { | ||
valueList.add("\"" + entry.getKey() + "\":" + entry.getValue().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure that entry.gevValue().toString()
provides the right value here? What if the string has double quotes in it? Are they being escaped? Any other special characters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, entry.getValue().toString()
is the right value here. All the special characters are already taken care of by this. The output could be a number (e.g. 10), a character with double quotes (e.g. "ye\"llow"), or even the full json tree (e.g. ["msg 1","msg 2","msg 3"], {"name":"hello","age":"world"} etc.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improvements can come in follow-on PR.
Description
Add include_keys and exclude_keys options to sink
Issues Resolved
Resolve #2975
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.