-
Notifications
You must be signed in to change notification settings - Fork 190
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
Connection code of HttpSink Plugin. Signed-off-by: mallikagogoi7 <[email protected]>
- Loading branch information
1 parent
0b804fd
commit cc0d527
Showing
21 changed files
with
889 additions
and
237 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
13 changes: 0 additions & 13 deletions
13
...lugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/ClientFactory.java
This file was deleted.
Oops, something went wrong.
39 changes: 39 additions & 0 deletions
39
.../src/main/java/org/opensearch/dataprepper/plugins/sink/FailedHttpResponseInterceptor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.dataprepper.plugins.sink; | ||
|
||
import org.apache.hc.core5.http.EntityDetails; | ||
import org.apache.hc.core5.http.HttpResponse; | ||
import org.apache.hc.core5.http.HttpResponseInterceptor; | ||
import org.apache.hc.core5.http.protocol.HttpContext; | ||
|
||
import java.io.IOException; | ||
|
||
public class FailedHttpResponseInterceptor implements HttpResponseInterceptor { | ||
|
||
public static final int ERROR_CODE_500 = 500; | ||
|
||
public static final int ERROR_CODE_400 = 400; | ||
|
||
public static final int ERROR_CODE_404 = 404; | ||
|
||
public static final int ERROR_CODE_501 = 501; | ||
|
||
private final String url; | ||
|
||
public FailedHttpResponseInterceptor(final String url){ | ||
this.url = url; | ||
} | ||
|
||
@Override | ||
public void process(HttpResponse response, EntityDetails entity, HttpContext context) throws IOException { | ||
if (response.getCode() == ERROR_CODE_500 || | ||
response.getCode() == ERROR_CODE_400 || | ||
response.getCode() == ERROR_CODE_404 || | ||
response.getCode() == ERROR_CODE_501) { | ||
throw new IOException(String.format("url: %s , status code: %s", url,response.getCode())); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
65 changes: 0 additions & 65 deletions
65
...gins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HttpAuthOptions.java
This file was deleted.
Oops, something went wrong.
46 changes: 46 additions & 0 deletions
46
...http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HttpEndPointResponse.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.dataprepper.plugins.sink; | ||
|
||
public class HttpEndPointResponse { | ||
private String url; | ||
private int statusCode; | ||
private String errMessage; | ||
|
||
public HttpEndPointResponse(final String url, | ||
final int statusCode, | ||
final String errMessage) { | ||
this.url = url; | ||
this.statusCode = statusCode; | ||
this.errMessage = errMessage; | ||
} | ||
|
||
public HttpEndPointResponse(final String url, | ||
final int statusCode) { | ||
this.url = url; | ||
this.statusCode = statusCode; | ||
} | ||
|
||
public String getUrl() { | ||
return url; | ||
} | ||
|
||
public int getStatusCode() { | ||
return statusCode; | ||
} | ||
|
||
public String getErrMessage() { | ||
return errMessage; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "{" + | ||
"url='" + url + '\'' + | ||
", statusCode=" + statusCode + | ||
", errMessage='" + errMessage + '\'' + | ||
'}'; | ||
} | ||
} |
76 changes: 76 additions & 0 deletions
76
.../java/org/opensearch/dataprepper/plugins/sink/certificate/CertificateProviderFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.dataprepper.plugins.sink.certificate; | ||
|
||
import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; | ||
import org.opensearch.dataprepper.plugins.certificate.acm.ACMCertificateProvider; | ||
import org.opensearch.dataprepper.plugins.certificate.file.FileCertificateProvider; | ||
import org.opensearch.dataprepper.plugins.certificate.s3.S3CertificateProvider; | ||
import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher; | ||
import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; | ||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; | ||
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; | ||
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; | ||
import software.amazon.awssdk.core.retry.RetryMode; | ||
import software.amazon.awssdk.services.acm.AcmClient; | ||
import software.amazon.awssdk.services.s3.S3Client; | ||
|
||
/** | ||
* This class consist logic for downloading the SSL certificates from S3/ACM/Local file. | ||
* | ||
*/ | ||
public class CertificateProviderFactory { | ||
private static final Logger LOG = LoggerFactory.getLogger(CertificateProviderFactory.class); | ||
|
||
final HttpSinkConfiguration httpSinkConfiguration; | ||
public CertificateProviderFactory(final HttpSinkConfiguration httpSinkConfiguration) { | ||
this.httpSinkConfiguration = httpSinkConfiguration; | ||
} | ||
|
||
/** | ||
* This method consist logic for downloading the SSL certificates from S3/ACM/Local file. | ||
* @return CertificateProvider | ||
*/ | ||
public CertificateProvider getCertificateProvider() { | ||
if (httpSinkConfiguration.useAcmCertForSSL()) { | ||
LOG.info("Using ACM certificate and private key for SSL/TLS."); | ||
final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder() | ||
.addCredentialsProvider(DefaultCredentialsProvider.create()).build(); | ||
final ClientOverrideConfiguration clientConfig = ClientOverrideConfiguration.builder() | ||
.retryPolicy(RetryMode.STANDARD) | ||
.build(); | ||
|
||
final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws"); | ||
|
||
final AcmClient awsCertificateManager = AcmClient.builder() | ||
.region(httpSinkConfiguration.getAwsAuthenticationOptions().getAwsRegion()) | ||
.credentialsProvider(credentialsProvider) | ||
.overrideConfiguration(clientConfig) | ||
.overrideConfiguration(metricPublisher -> metricPublisher.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics))) | ||
.build(); | ||
|
||
return new ACMCertificateProvider(awsCertificateManager, httpSinkConfiguration.getAcmCertificateArn(), | ||
httpSinkConfiguration.getAcmCertIssueTimeOutMillis(), httpSinkConfiguration.getAcmPrivateKeyPassword()); | ||
} else if (httpSinkConfiguration.isSslCertAndKeyFileInS3()) { | ||
LOG.info("Using S3 to fetch certificate and private key for SSL/TLS."); | ||
final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder() | ||
.addCredentialsProvider(DefaultCredentialsProvider.create()).build(); | ||
final S3Client s3Client = S3Client.builder() | ||
.region(httpSinkConfiguration.getAwsAuthenticationOptions().getAwsRegion()) | ||
.credentialsProvider(credentialsProvider) | ||
.build(); | ||
return new S3CertificateProvider(s3Client, | ||
httpSinkConfiguration.getSslCertificateFile(), | ||
httpSinkConfiguration.getSslKeyFile()); | ||
} else { | ||
LOG.info("Using local file system to get certificate and private key for SSL/TLS."); | ||
return new FileCertificateProvider(httpSinkConfiguration.getSslCertificateFile(), httpSinkConfiguration.getSslKeyFile()); | ||
} | ||
} | ||
} |
Oops, something went wrong.