Skip to content

Commit

Permalink
Connection code of HttpSink Plugin for #874.
Browse files Browse the repository at this point in the history
Signed-off-by: mallikagogoi7 <[email protected]>
  • Loading branch information
mallikagogoi7 committed Jul 6, 2023
1 parent 75fa735 commit 4670215
Show file tree
Hide file tree
Showing 21 changed files with 883 additions and 234 deletions.
2 changes: 2 additions & 0 deletions data-prepper-plugins/http-sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(':data-prepper-plugins:common')
implementation project(path: ':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:acm'
implementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2'
implementation 'org.apache.commons:commons-lang3:3.12.0'
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink;

import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpResponseInterceptor;
import org.apache.hc.core5.http.protocol.HttpContext;

import java.io.IOException;

public class FailedHttpResponseInterceptor implements HttpResponseInterceptor {

public static final int ERROR_CODE_500 = 500;

public static final int ERROR_CODE_400 = 400;

public static final int ERROR_CODE_404 = 404;

public static final int ERROR_CODE_501 = 501;

private final String url;

public FailedHttpResponseInterceptor(final String url){
this.url = url;
}

@Override
public void process(HttpResponse response, EntityDetails entity, HttpContext context) throws IOException {
if (response.getCode() == ERROR_CODE_500 ||
response.getCode() == ERROR_CODE_400 ||
response.getCode() == ERROR_CODE_404 ||
response.getCode() == ERROR_CODE_501) {
throw new IOException(String.format("url: %s , status code: %s", url,response.getCode()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,73 @@
*/
package org.opensearch.dataprepper.plugins.sink;

import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.apache.hc.client5.http.HttpRequestRetryStrategy;
import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.util.TimeValue;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.plugins.accumulator.BufferFactory;
import org.opensearch.dataprepper.plugins.accumulator.BufferTypeOptions;
import org.opensearch.dataprepper.plugins.accumulator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.accumulator.LocalFileBufferFactory;
import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration;
import org.opensearch.dataprepper.plugins.sink.configuration.UrlConfigurationOption;
import org.opensearch.dataprepper.plugins.sink.service.HttpSinkService;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.List;
import java.util.Optional;

@DataPrepperPlugin(name = "http", pluginType = Sink.class, pluginConfigurationType = HttpSinkConfiguration.class)
public class HTTPSink extends AbstractSink<Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(HTTPSink.class);

private final HttpSinkConfiguration httpSinkConfiguration;

private volatile boolean sinkInitialized;

private final BufferFactory bufferFactory;

private final HttpSinkService httpSinkService;

@DataPrepperPluginConstructor
public HTTPSink(final PluginSetting pluginSetting,
final HttpSinkConfiguration httpSinkConfiguration,
final PluginFactory pluginFactory,
final AwsCredentialsSupplier awsCredentialsSupplier) {
final PipelineDescription pipelineDescription) {
super(pluginSetting);
this.httpSinkConfiguration = httpSinkConfiguration;
sinkInitialized = Boolean.FALSE;
final PluginModel codecConfiguration = httpSinkConfiguration.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(),
codecConfiguration.getPluginSettings());
codecPluginSettings.setPipelineName(pipelineDescription.getPipelineName());
this.sinkInitialized = Boolean.FALSE;
if (httpSinkConfiguration.getBufferType().equals(BufferTypeOptions.LOCALFILE)) {
this.bufferFactory = new LocalFileBufferFactory();
} else {
this.bufferFactory = new InMemoryBufferFactory();
}
final HttpRequestRetryStrategy httpRequestRetryStrategy = new DefaultHttpRequestRetryStrategy(httpSinkConfiguration.getMaxUploadRetries(),
TimeValue.of(httpSinkConfiguration.getHttpRetryInterval()));

final HttpClientBuilder httpClientBuilder = HttpClients.custom()
.setRetryStrategy(httpRequestRetryStrategy);

this.httpSinkService = new HttpSinkService(
httpSinkConfiguration,
bufferFactory,
codecPluginSettings,
httpClientBuilder,
pluginMetrics);
}

@Override
Expand Down Expand Up @@ -75,23 +105,6 @@ public void doOutput(final Collection<Record<Event>> records) {
if (records.isEmpty()) {
return;
}
//TODO: call Service call method
}


public Optional<CloseableHttpClient> getAuthHandlerByConfig(final HttpSinkConfiguration sinkConfiguration){
//TODO: AWS Sigv4 - check
// TODO: call Auth Handlers based on auth Type

return null;
}

public List<HttpAuthOptions> getClassicHttpRequestList(final List<UrlConfigurationOption> urlConfigurationOption){
// logic for create auth handler for each url based on provided configuration - getAuthHandlerByConfig()
// logic for request preparation for each url
// logic for worker is not there in url level then verify the global workers if global workers also not defined then default 1
// logic for get the Proxy object if url level proxy enabled else look the global proxy.
// Aws SageMaker headers if headers found in the configuration
return null;
httpSinkService.output(records);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink;

public class HttpEndPointResponse {
private String url;
private int statusCode;
private String errMessage;

public HttpEndPointResponse(final String url,
final int statusCode,
final String errMessage) {
this.url = url;
this.statusCode = statusCode;
this.errMessage = errMessage;
}

public HttpEndPointResponse(final String url,
final int statusCode) {
this.url = url;
this.statusCode = statusCode;
}

public String getUrl() {
return url;
}

public int getStatusCode() {
return statusCode;
}

public String getErrMessage() {
return errMessage;
}

@Override
public String toString() {
return "{" +
"url='" + url + '\'' +
", statusCode=" + statusCode +
", errMessage='" + errMessage + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.certificate;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.acm.ACMCertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.file.FileCertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.s3.S3CertificateProvider;
import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher;
import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.services.acm.AcmClient;
import software.amazon.awssdk.services.s3.S3Client;

/**
* This class consist logic for downloading the SSL certificates from S3/ACM/Local file.
*
*/
public class CertificateProviderFactory {
private static final Logger LOG = LoggerFactory.getLogger(CertificateProviderFactory.class);

final HttpSinkConfiguration httpSinkConfiguration;
public CertificateProviderFactory(final HttpSinkConfiguration httpSinkConfiguration) {
this.httpSinkConfiguration = httpSinkConfiguration;
}

/**
* This method consist logic for downloading the SSL certificates from S3/ACM/Local file.
* @return CertificateProvider
*/
public CertificateProvider getCertificateProvider() {
if (httpSinkConfiguration.useAcmCertForSSL()) {
LOG.info("Using ACM certificate and private key for SSL/TLS.");
final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder()
.addCredentialsProvider(DefaultCredentialsProvider.create()).build();
final ClientOverrideConfiguration clientConfig = ClientOverrideConfiguration.builder()
.retryPolicy(RetryMode.STANDARD)
.build();

final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws");

final AcmClient awsCertificateManager = AcmClient.builder()
.region(httpSinkConfiguration.getAwsAuthenticationOptions().getAwsRegion())
.credentialsProvider(credentialsProvider)
.overrideConfiguration(clientConfig)
.overrideConfiguration(metricPublisher -> metricPublisher.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics)))
.build();

return new ACMCertificateProvider(awsCertificateManager, httpSinkConfiguration.getAcmCertificateArn(),
httpSinkConfiguration.getAcmCertIssueTimeOutMillis(), httpSinkConfiguration.getAcmPrivateKeyPassword());
} else if (httpSinkConfiguration.isSslCertAndKeyFileInS3()) {
LOG.info("Using S3 to fetch certificate and private key for SSL/TLS.");
final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder()
.addCredentialsProvider(DefaultCredentialsProvider.create()).build();
final S3Client s3Client = S3Client.builder()
.region(httpSinkConfiguration.getAwsAuthenticationOptions().getAwsRegion())
.credentialsProvider(credentialsProvider)
.build();
return new S3CertificateProvider(s3Client,
httpSinkConfiguration.getSslCertificateFile(),
httpSinkConfiguration.getSslKeyFile());
} else {
LOG.info("Using local file system to get certificate and private key for SSL/TLS.");
return new FileCertificateProvider(httpSinkConfiguration.getSslCertificateFile(), httpSinkConfiguration.getSslKeyFile());
}
}
}
Loading

0 comments on commit 4670215

Please sign in to comment.