Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Sink Codecs - Follow up PR to 2881 #2898

Closed
wants to merge 13 commits into from

Conversation

umayr-codes
Copy link
Contributor

Description

  • AvroSinkCodec, JsonSinkCodec & CsvSinkCodec implementation added
  • config classes for the respective sink codecs added
  • Unit Tests added
  • Integration Tests added

Issues Resolved

Resolves #2403

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
}



Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Remove extra space?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private static int numberOfRecords;
private CsvOutputCodecConfig config;


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Remove extra lines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@JsonProperty("schema_file_location")
private String fileLocation;

@JsonProperty("exclude_keys")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix indentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

schema=parseSchema(config.getSchema());
}
else if(config.getFileLocation()!=null){
schema = AvroSchemaParser.parseSchemaFromJsonFile(config.getFileLocation());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an implicit assumption here that the schema is always provided as a local file. What if the schema is provided as a file in S3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fetching Schema from a file present in S3 or elsewhere wasn't a part of the initial requirements.

Signed-off-by: umairofficial <[email protected]>
}

@Override
public void complete(final OutputStream outputStream) throws IOException {
// TODO: do the final wrapping like closing outputstream and close generator
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is not threadsafe. Because you are writing generator.writeStartObject and generator.writeEndObject as different calls, it's possible to write malformed json if multiple threads are running. You can either make this method synchronized or write the json into a byte array and then write into output stream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods have been made synchronised

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start and complete were fine. writeEvent needs to be made synchronized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that has been taken care of.

Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
@umayr-codes
Copy link
Contributor Author

Closing in favour of #2986.

@umayr-codes umayr-codes closed this Jul 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support a generic codec structure for sinks
3 participants