Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New OpenSearch API source implementation #4603

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

sb2k16
Copy link
Contributor

@sb2k16 sb2k16 commented Jun 5, 2024

Description

In order for DataPrepper to support all OpenSearch Document API(s), we need to build a new source similar to the existing http source. This pull request is intended to implement a new OpenSearch API source like opensearch_api similar to http source. This source should support the Document API Bulk.

This pull request includes the following:

  1. Path and HTTP methods:
    • POST _bulk
    • POST <index>/_bulk
  2. Optional URL parameters
    • pipeline and routing. (TODO: pipeline parameter handling on Sink side)
  3. Multi-line JSON Bulk Request payload

Example pipeline configuration with opensearch_api source looks like:

simple-sample-pipeline:
  source:
    opensearch_api:
      path: "/opensearch"
      port: 9202
  sink:
   ...

Issues Resolved

Contributes to #248

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
  • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

List<Map<String, Object>> jsonListData = new ArrayList<>();

String requestBody = new String(httpData.toInputStream().readAllBytes(), StandardCharsets.UTF_8);
List<String> jsonLines = Arrays.asList(requestBody.split(REGEX));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be better to create the Pattern in the constructor. Then you can call splitPattern.split(requestBody)). This should avoid compilation each time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.

violationRules {
rule { //in addition to core projects rule
limit {
minimum = 0.90
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new project. Can we aim for 100% coverage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.

Thread.currentThread().interrupt();
throw new RuntimeException(ex);
}
LOG.info("Started OpenSearch API source on port " + sourceConfig.getPort() + "...");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOG.info("Started OpenSearch API source on port " + sourceConfig.getPort() + "...");
LOG.info("Started OpenSearch API source on port {}.", sourceConfig.getPort());

Let's avoid these ellipses at the end. They are unclear and make it seem that more is coming.

Also, please use SLF4J interpolation over string concatenation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.


public class OpenSearchAPISourceConfig extends BaseHttpServerConfig {

static final String DEFAULT_ENDPOINT_URI = "/opensearch";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default should just be / since this matches existing OpenSearch domains.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.

import java.util.Arrays;
import java.util.ArrayList;

/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please run a code formatter over your changes. There is a lot of whitespace that is off.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.

if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}
if (server == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot here that is similar to the http source. Can we extend your work from #4570 to have more of this shared in common?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.


@Post("/_bulk")
public HttpResponse doPostBulk(final ServiceRequestContext serviceRequestContext, final AggregatedHttpRequest aggregatedHttpRequest,
@Param("pipeline") Optional<String> pipeline, @Param("routing") Optional<String> routing) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing Optional in as a parameter is an anti-pattern. You don't actually know if it is null or not. So you have tw checks you need to make pipeline != null && pipeline.isPresent(). Just take in a String and expect it to possibly be null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.

@Post("/{index}/_bulk")
public HttpResponse doPostBulkIndex(final ServiceRequestContext serviceRequestContext, final AggregatedHttpRequest aggregatedHttpRequest, @Param("index") Optional<String> index,
@Param("pipeline") Optional<String> pipeline, @Param("routing") Optional<String> routing) throws Exception {
requestsReceivedCounter.increment();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's consolidate all of this logic in processBulkRequest. The only thing that each of these methods should do is create the BulkAPIRequestParams and then callprocessBulkRequest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.

dlvenable
dlvenable previously approved these changes Jun 14, 2024
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the refactoring work!


private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAPIService.class);

// TODO: support other data-types as request body, e.g. json_lines, msgpack
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO really true? I think this is the only codec we need for _bulk.

@sb2k16 sb2k16 force-pushed the opensearch-api-async branch 2 times, most recently from fe8659f to 77930fc Compare July 25, 2024 16:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants