Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet Sink Codec #2928

Merged
merged 15 commits into from
Jul 27, 2023
Merged

Conversation

umayr-codes
Copy link
Contributor

Description

  • ParquetSinkCodec implementation added
  • Supports both Logical and Primitive Data Types

Issues Resolved

Resolves #2403

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Copy link
Contributor

@asuresh8 asuresh8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asuresh8
Copy link
Contributor

You haven't addressed my previous comment. I want to see integration tests for all codecs where files are actually written to an S3 bucket and then read and asserted that they are correct.

Again see https://github.com/opensearch-project/data-prepper/tree/18de3fff6b06be80738d522a74f5144e5042da7a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source

@omkarmmore95
Copy link
Contributor

Team is working on integration test.

@omkarmmore95
Copy link
Contributor

You haven't addressed my previous comment. I want to see integration tests for all codecs where files are actually written to an S3 bucket and then read and asserted that they are correct.

Again see https://github.com/opensearch-project/data-prepper/tree/18de3fff6b06be80738d522a74f5144e5042da7a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source

As suggested by @dlvenable we have written integration test which integrates input and output codec and assert that the input data to input codec is equal to output data from outputcodec. Same we would be writing for Parquet as well.

Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
Signed-off-by: umairofficial <[email protected]>
@asuresh8
Copy link
Contributor

asuresh8 commented Jul 7, 2023

Where are the integration tests for all the codecs?

I would like to see some instructions like the bottom of https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/s3-source/README.md to run the integration tests and I want to see the command + output that runs the integration test. Similar to the the integration tests for the source:

./gradlew :data-prepper-plugins:s3-source:integrationTest -Dtests.s3source.region=<your-aws-region> -Dtests.s3source.bucket=<your-bucket> -Dtests.s3source.queue.url=<your-queue-url>

@umayr-codes
Copy link
Contributor Author

Where are the integration tests for all the codecs?

I would like to see some instructions like the bottom of https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/s3-source/README.md to run the integration tests and I want to see the command + output that runs the integration test. Similar to the the integration tests for the source:

./gradlew :data-prepper-plugins:s3-source:integrationTest -Dtests.s3source.region=<your-aws-region> -Dtests.s3source.bucket=<your-bucket> -Dtests.s3source.queue.url=<your-queue-url>

The integration of all codecs are in the respective PRs. ITs for CSV, JSON and Avro codecs are in PR #2986 (in S3SinkServiceIT class).

Signed-off-by: umairofficial <[email protected]>
@umayr-codes
Copy link
Contributor Author

Where are the integration tests for all the codecs?

I would like to see some instructions like the bottom of https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/s3-source/README.md to run the integration tests and I want to see the command + output that runs the integration test. Similar to the the integration tests for the source:

./gradlew :data-prepper-plugins:s3-source:integrationTest -Dtests.s3source.region=<your-aws-region> -Dtests.s3source.bucket=<your-bucket> -Dtests.s3source.queue.url=<your-queue-url>

Please see the readme.md file of parquet-codecs project. The gradle command to run the Integration tests is already in place.

kkondaka
kkondaka previously approved these changes Jul 27, 2023
} else {
modifiedEvent = event;
}
for (final String key : modifiedEvent.toMap().keySet()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you are anyways creating a new parquetRecord, I think it makes sense to add the "tagsTargetKey" also here without creating a 'modifiedEvent', right? I think you should be write like

      if (tagsTargetKey != null) {
            parquetRecord.put(tagsTargetKey, event.getMetadata().getTags());
       }. 
       writer.write(parquestRecord)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Krishna, the logic behing modifying the event itself is because we are generating schema from event in case user doesn't supply any schema.If event won't be modified to contain tags target key and tags then parquet record won't be able to store it either.

# Conflicts:
#	data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java
#	data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvOutputCodec.java
#	data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java
#	data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java
#	data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java
#	data-prepper-plugins/s3-sink/build.gradle
#	data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java
#	data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java
#	data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/Buffer.java
#	data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/InMemoryBuffer.java
#	data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBuffer.java
Signed-off-by: umairofficial <[email protected]>
@kkondaka kkondaka merged commit 42e274d into opensearch-project:main Jul 27, 2023
21 of 24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support a generic codec structure for sinks
5 participants