-
Notifications
You must be signed in to change notification settings - Fork 190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for Sink Codecs - Follow up PR to 2881 #2898
Conversation
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
} | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Remove extra space?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private static int numberOfRecords; | ||
private CsvOutputCodecConfig config; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Remove extra lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@JsonProperty("schema_file_location") | ||
private String fileLocation; | ||
|
||
@JsonProperty("exclude_keys") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
schema=parseSchema(config.getSchema()); | ||
} | ||
else if(config.getFileLocation()!=null){ | ||
schema = AvroSchemaParser.parseSchemaFromJsonFile(config.getFileLocation()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an implicit assumption here that the schema is always provided as a local file. What if the schema is provided as a file in S3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fetching Schema from a file present in S3 or elsewhere wasn't a part of the initial requirements.
Signed-off-by: umairofficial <[email protected]>
} | ||
|
||
@Override | ||
public void complete(final OutputStream outputStream) throws IOException { | ||
// TODO: do the final wrapping like closing outputstream and close generator | ||
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation is not threadsafe. Because you are writing generator.writeStartObject and generator.writeEndObject as different calls, it's possible to write malformed json if multiple threads are running. You can either make this method synchronized or write the json into a byte array and then write into output stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The methods have been made synchronised
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start and complete were fine. writeEvent needs to be made synchronized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, that has been taken care of.
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Closing in favour of #2986. |
Description
AvroSinkCodec
,JsonSinkCodec
&CsvSinkCodec
implementation addedconfig
classes for the respective sink codecs addedIssues Resolved
Resolves #2403
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.