Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: support es 6 in sink #3045

Merged
merged 28 commits into from
Jul 24, 2023
Merged

ENH: support es 6 in sink #3045

merged 28 commits into from
Jul 24, 2023

Conversation

chenqi0805
Copy link
Collaborator

Description

This PR adds support for bulk API with ES 6 backend.

Issues Resolved

Resolves #3003

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

chenqi0805 and others added 23 commits July 11, 2023 17:37
Signed-off-by: George Chen <[email protected]>
* Retry s3 reads on socket exceptions.

S3 will reset the conenction on their end frequently. To not lose data,
data prepper should retry all socket exceptions by attempting to re-open
the stream.

Signed-off-by: Adi Suresh <[email protected]>

* Bubble up parquet exceptions.

Signed-off-by: Adi Suresh <[email protected]>

---------

Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: George Chen <[email protected]>
)

* Fix race condition in SqsWorker when acknowledgements are enabled

Signed-off-by: Krishna Kondaka <[email protected]>

* Modified to do the synchronization in the acknowledgement set framework

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed failing tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Removed unused variable

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comment and fixed failing tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed failing tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed checkStyle failure

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: David Venable <[email protected]>
Signed-off-by: George Chen <[email protected]>
* GitHub-issue#253 : Implemented GeoIP processor functionality
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality. Addressed Code review comments
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality. Addressed Code review comments
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Fixed the test-case-failed issue.

Signed-off-by: Deepak Sahu <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality. Addressed Code review comments
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality. Addressed Code review comments
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality. Addressed Code review comments
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality. Addressed Code review comments
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality. Addressed Code review comments
Signed-off-by: venkataraopasyavula <[email protected]>

---------

Signed-off-by: Deepak Sahu <[email protected]>
Co-authored-by: Deepak Sahu <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: David Venable <[email protected]>
Signed-off-by: George Chen <[email protected]>
implement transform_key feature

Signed-off-by: Kat Shen <[email protected]>

---------

Signed-off-by: Kat Shen <[email protected]>
Co-authored-by: Kat Shen <[email protected]>
Signed-off-by: George Chen <[email protected]>
…lientFactory utilities. (#2982)

Added CloudWatchLogs Buffer, ThresholdCheck, and ClientFactory utilities.

---------

Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
Co-authored-by: Marcos <[email protected]>
Signed-off-by: George Chen <[email protected]>
* Add Kafka Security Configurations

Signed-off-by: Krishna Kondaka <[email protected]>

* Modified kafka security config. Added new fields to AwsConfig

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Modified AwsConfig to have msk option that can take multiple options

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: David Venable <[email protected]>
Signed-off-by: George Chen <[email protected]>
* Translate Plugin: Simplified Config. Added functionality for multiple sources and multiple targets

Signed-off-by: Vishal Boinapalli <[email protected]>

* Moved helper methods out of config file

Signed-off-by: Vishal Boinapalli <[email protected]>

---------

Signed-off-by: Vishal Boinapalli <[email protected]>
Signed-off-by: George Chen <[email protected]>
…the OpenSearch sink, add opensearch prefix to opensearch source metadata keys (#3025)

Signed-off-by: Taylor Gray <[email protected]>
Signed-off-by: George Chen <[email protected]>
* GitHub-issue#253 : Implemented GeoIP processor integration test
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor integration test
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor integration test
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor integration test
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor integration test
Signed-off-by: venkataraopasyavula <[email protected]>
Signed-off-by: George Chen <[email protected]>
Connection code of HttpSink Plugin.
Signed-off-by: mallikagogoi7 <[email protected]>
Signed-off-by: George Chen <[email protected]>
* implement transform_key feature

Signed-off-by: Kat Shen <[email protected]>

* fix unit tests

Signed-off-by: Kat Shen <[email protected]>

* fix unit tests

Signed-off-by: Kat Shen <[email protected]>

* remove bin files

Signed-off-by: Kat Shen <[email protected]>

* add static final variable for string comparison

Signed-off-by: Kat Shen <[email protected]>

* add whitespace description to readme, add configs

Signed-off-by: Kat Shen <[email protected]>

* writing whitespace impl

Signed-off-by: Kat Shen <[email protected]>

* add whitespace impl

Signed-off-by: Kat Shen <[email protected]>

* fix impl, writing tests

Signed-off-by: Kat Shen <[email protected]>

* write whitespace test

Signed-off-by: Kat Shen <[email protected]>

* fix formatting, whitespace() -> trimWhitespace()

Signed-off-by: Kat Shen <[email protected]>

* edit readme, add config

Signed-off-by: Kat Shen <[email protected]>

* update logic to valid values set

Signed-off-by: Kat Shen <[email protected]>

* correct return value

Signed-off-by: Kat Shen <[email protected]>

* update variables to static

Signed-off-by: Kat Shen <[email protected]>

* correct convention for private variables

Signed-off-by: Kat Shen <[email protected]>

* impl allow duplicate values, writing tests

Signed-off-by: Kat Shen <[email protected]>

* allow duplicate values impl + tests

Signed-off-by: Kat Shen <[email protected]>

* modify regex portion to final variables, remove some whitespace

Signed-off-by: Kat Shen <[email protected]>

* rerun checks

Signed-off-by: Kat Shen <[email protected]>

* rename methods/variables for more clarity, change default bool value to be false

Signed-off-by: Kat Shen <[email protected]>

* rerun checks

Signed-off-by: Kat Shen <[email protected]>

* change logic to reflect skip_duplicate_values

Signed-off-by: Kat Shen <[email protected]>

* modify tests according to changed logic

Signed-off-by: Kat Shen <[email protected]>

* remove include keys content (accidentally included it oops)

Signed-off-by: Kat Shen <[email protected]>

---------

Signed-off-by: Kat Shen <[email protected]>
Signed-off-by: Katherine Shen <[email protected]>
Co-authored-by: Kat Shen <[email protected]>
Signed-off-by: George Chen <[email protected]>
* -Support for Sink Codecs
Signed-off-by: omkarmmore95 <[email protected]>

* -Support for Sink Codecs
Signed-off-by: omkarmmore95 <[email protected]>

* -Support for Sink Codecs
Signed-off-by: omkarmmore95 <[email protected]>
Signed-off-by: George Chen <[email protected]>
…use for index in OpenSearch sink (#3032)

Add support for using expressions with formatString in JacksonEvent, use for index in OpenSearch sink

Signed-off-by: Taylor Gray <[email protected]>

---------

Signed-off-by: Taylor Gray <[email protected]>
Signed-off-by: George Chen <[email protected]>
…#3039)

* Fix race condition in data prepper sources using e2e acknowledgements

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed checkStyle error

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
Signed-off-by: George Chen <[email protected]>
* Kafka Source - Cleanup and Enhancements for MSK

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed checkstyle error

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
@chenqi0805 chenqi0805 requested a review from oeyh as a code owner July 19, 2023 15:37
@github-actions
Copy link

github-actions bot commented Jul 19, 2023

Unit Test Results

  1 276 files  +10    1 276 suites  +10   41m 53s ⏱️ + 2m 27s
  5 092 tests +28    5 091 ✔️ +28  1 💤 ±0  0 ±0 
10 210 runs  +50  10 208 ✔️ +50  2 💤 ±0  0 ±0 

Results for commit 990c7e4. ± Comparison against base commit 351845b.

♻️ This comment has been updated with latest results.

Signed-off-by: George Chen <[email protected]>
@@ -209,6 +209,7 @@ With the `document_root_key` set to `status`. The document structure would be `{
duration: "15 ms"
}
```
- `backend_version`: A String indicating whether the sink backend version is Elasticsearch 6 or above (i.e. Elasticsearch 7.x or OpenSearch). `es_6` represents Elasticsearch 6; `null` represents Elasticsearch 7.x or OpenSearch. Default to `null`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe name this distribution_version? That might be closer to the terms used in OpenSearch itself which uses "distribution" and "version".

import java.util.stream.Collectors;

public enum BackendVersion {
ES6("es_6");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on dropping the "_": es6?

Also, we should probably have some explicit options for users to provide for other version.

Perhaps:

  • es6
  • es7
  • os1
  • os2

How would we like to support newer OS versions as they are added? Maybe a default option which aims for the latest? Thus:

  • es6
  • default

this.openSearchClient = openSearchClient;
}

public BulkResponse bulk(BulkRequest request) throws IOException, OpenSearchException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably split these classes.

Have an interface for BulkApiWrapper. Then have a factory which produces either an Es6BulkApiWrapper or an OpenSearchDefaultBulkApiWrapper.

Then you can drop this conditional.

And you separate the code that is for ES6 entirely from the code for other versions.

@@ -218,8 +220,9 @@ private void doInitializeInternal() throws IOException {
TransportOptions.builder()
.setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id")
.build());
bulkAPIWrapper = new BulkAPIWrapper(openSearchSinkConfig.getIndexConfiguration(), filteringOpenSearchClient);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment below. Let's get this from a factory.

bulkApiWrapper = BulkApiWrapperFactory.getWrapper(openSearchSinkConfig.getIndexConfiguration(), filteringOpenSearchClient);

Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
graytaylor0
graytaylor0 previously approved these changes Jul 24, 2023

@Test
void testGetOpenSearchDefaultBulkApiWrapper() {
when(indexConfiguration.getDistributionVersion()).thenReturn(DistributionVersion.ES6);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is testing the ES6 distribution. Perhaps a copy-paste error.


// Request path
request -> {
final String index = request.index() == null ? DUMMY_DEFAULT_INDEX : request.index();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have this condition? If the request doesn't have an index, perhaps we should fail. Otherwise, the data goes into an index named dummy.

@chenqi0805 chenqi0805 merged commit 07f00c2 into main Jul 24, 2023
60 checks passed
@chenqi0805 chenqi0805 deleted the maint/support-es-6-in-sink branch July 24, 2023 20:52
@chenqi0805 chenqi0805 mentioned this pull request Jul 25, 2023
4 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support ES 6.8 in opensearch sink