diff --git a/data-prepper-plugins/http-sink/build.gradle b/data-prepper-plugins/http-sink/build.gradle index 21e678348f..3a6220053d 100644 --- a/data-prepper-plugins/http-sink/build.gradle +++ b/data-prepper-plugins/http-sink/build.gradle @@ -3,11 +3,13 @@ dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:aws-plugin-api') implementation project(':data-prepper-plugins:common') + implementation project(path: ':data-prepper-plugins:common') implementation 'com.fasterxml.jackson.core:jackson-core' implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:sts' + implementation 'software.amazon.awssdk:acm' implementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2' implementation 'org.apache.commons:commons-lang3:3.12.0' } diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/ClientFactory.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/ClientFactory.java deleted file mode 100644 index fdb3dcaaa0..0000000000 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/ClientFactory.java +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.sink; - -/** - * AWS factory class required to create AWS Http Endpoint client for this plugin. - */ -public final class ClientFactory { - private ClientFactory() {} - -} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/FailedHttpResponseInterceptor.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/FailedHttpResponseInterceptor.java new file mode 100644 index 0000000000..64bbff5bc6 --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/FailedHttpResponseInterceptor.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpResponseInterceptor; +import org.apache.hc.core5.http.protocol.HttpContext; + +import java.io.IOException; + +public class FailedHttpResponseInterceptor implements HttpResponseInterceptor { + + public static final int ERROR_CODE_500 = 500; + + public static final int ERROR_CODE_400 = 400; + + public static final int ERROR_CODE_404 = 404; + + public static final int ERROR_CODE_501 = 501; + + private final String url; + + public FailedHttpResponseInterceptor(final String url){ + this.url = url; + } + + @Override + public void process(HttpResponse response, EntityDetails entity, HttpContext context) throws IOException { + if (response.getCode() == ERROR_CODE_500 || + response.getCode() == ERROR_CODE_400 || + response.getCode() == ERROR_CODE_404 || + response.getCode() == ERROR_CODE_501) { + throw new IOException(String.format("url: %s , status code: %s", url,response.getCode())); + } + } +} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HTTPSink.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HTTPSink.java index 06d057b625..a736de6534 100644 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HTTPSink.java +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HTTPSink.java @@ -4,10 +4,15 @@ */ package org.opensearch.dataprepper.plugins.sink; -import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; -import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.apache.hc.client5.http.HttpRequestRetryStrategy; +import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy; +import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.core5.util.TimeValue; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; @@ -15,32 +20,57 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.AbstractSink; import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.plugins.accumulator.BufferFactory; +import org.opensearch.dataprepper.plugins.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.accumulator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.accumulator.LocalFileBufferFactory; import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; -import org.opensearch.dataprepper.plugins.sink.configuration.UrlConfigurationOption; +import org.opensearch.dataprepper.plugins.sink.service.HttpSinkService; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; -import java.util.List; -import java.util.Optional; @DataPrepperPlugin(name = "http", pluginType = Sink.class, pluginConfigurationType = HttpSinkConfiguration.class) public class HTTPSink extends AbstractSink> { private static final Logger LOG = LoggerFactory.getLogger(HTTPSink.class); - private final HttpSinkConfiguration httpSinkConfiguration; - private volatile boolean sinkInitialized; + private final BufferFactory bufferFactory; + + private final HttpSinkService httpSinkService; + @DataPrepperPluginConstructor public HTTPSink(final PluginSetting pluginSetting, final HttpSinkConfiguration httpSinkConfiguration, final PluginFactory pluginFactory, - final AwsCredentialsSupplier awsCredentialsSupplier) { + final PipelineDescription pipelineDescription) { super(pluginSetting); - this.httpSinkConfiguration = httpSinkConfiguration; - sinkInitialized = Boolean.FALSE; + final PluginModel codecConfiguration = httpSinkConfiguration.getCodec(); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), + codecConfiguration.getPluginSettings()); + codecPluginSettings.setPipelineName(pipelineDescription.getPipelineName()); + this.sinkInitialized = Boolean.FALSE; + if (httpSinkConfiguration.getBufferType().equals(BufferTypeOptions.LOCALFILE)) { + this.bufferFactory = new LocalFileBufferFactory(); + } else { + this.bufferFactory = new InMemoryBufferFactory(); + } + final HttpRequestRetryStrategy httpRequestRetryStrategy = new DefaultHttpRequestRetryStrategy(httpSinkConfiguration.getMaxUploadRetries(), + TimeValue.of(httpSinkConfiguration.getHttpRetryInterval())); + + final HttpClientBuilder httpClientBuilder = HttpClients.custom() + .setRetryStrategy(httpRequestRetryStrategy); + + this.httpSinkService = new HttpSinkService( + httpSinkConfiguration, + bufferFactory, + codecPluginSettings, + httpClientBuilder, + pluginMetrics); } @Override @@ -75,23 +105,6 @@ public void doOutput(final Collection> records) { if (records.isEmpty()) { return; } - //TODO: call Service call method - } - - - public Optional getAuthHandlerByConfig(final HttpSinkConfiguration sinkConfiguration){ - //TODO: AWS Sigv4 - check - // TODO: call Auth Handlers based on auth Type - - return null; - } - - public List getClassicHttpRequestList(final List urlConfigurationOption){ - // logic for create auth handler for each url based on provided configuration - getAuthHandlerByConfig() - // logic for request preparation for each url - // logic for worker is not there in url level then verify the global workers if global workers also not defined then default 1 - // logic for get the Proxy object if url level proxy enabled else look the global proxy. - // Aws SageMaker headers if headers found in the configuration - return null; + httpSinkService.output(records); } } \ No newline at end of file diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HttpAuthOptions.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HttpAuthOptions.java deleted file mode 100644 index 61b20993d8..0000000000 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HttpAuthOptions.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.sink; - -import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; -import org.apache.hc.core5.http.ClassicHttpRequest; - -public class HttpAuthOptions { - private String url; - - private CloseableHttpClient closeableHttpClient; - - private ClassicHttpRequest classicHttpRequest; - - private int workers; - - private String proxy; - - public CloseableHttpClient getCloseableHttpClient() { - return closeableHttpClient; - } - - public HttpAuthOptions setCloseableHttpClient(CloseableHttpClient closeableHttpClient) { - this.closeableHttpClient = closeableHttpClient; - return this; - } - - public ClassicHttpRequest getClassicHttpRequest() { - return classicHttpRequest; - } - - public HttpAuthOptions setClassicHttpRequest(ClassicHttpRequest classicHttpRequest) { - this.classicHttpRequest = classicHttpRequest; - return this; - } - - public int getWorkers() { - return workers; - } - - public HttpAuthOptions setWorkers(int workers) { - this.workers = workers; - return this; - } - - public String getUrl() { - return url; - } - - public HttpAuthOptions setUrl(String url) { - this.url = url; - return this; - } - - public String getProxy() { - return proxy; - } - - public HttpAuthOptions setProxy(String proxy) { - this.proxy = proxy; - return this; - } -} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HttpEndPointResponse.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HttpEndPointResponse.java new file mode 100644 index 0000000000..4c3b98a38a --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HttpEndPointResponse.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +public class HttpEndPointResponse { + private String url; + private int statusCode; + private String errMessage; + + public HttpEndPointResponse(final String url, + final int statusCode, + final String errMessage) { + this.url = url; + this.statusCode = statusCode; + this.errMessage = errMessage; + } + + public HttpEndPointResponse(final String url, + final int statusCode) { + this.url = url; + this.statusCode = statusCode; + } + + public String getUrl() { + return url; + } + + public int getStatusCode() { + return statusCode; + } + + public String getErrMessage() { + return errMessage; + } + + @Override + public String toString() { + return "{" + + "url='" + url + '\'' + + ", statusCode=" + statusCode + + ", errMessage='" + errMessage + '\'' + + '}'; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/certificate/CertificateProviderFactory.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/certificate/CertificateProviderFactory.java new file mode 100644 index 0000000000..97fa71fb74 --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/certificate/CertificateProviderFactory.java @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.certificate; + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; +import org.opensearch.dataprepper.plugins.certificate.acm.ACMCertificateProvider; +import org.opensearch.dataprepper.plugins.certificate.file.FileCertificateProvider; +import org.opensearch.dataprepper.plugins.certificate.s3.S3CertificateProvider; +import org.opensearch.dataprepper.plugins.metricpublisher.MicrometerMetricPublisher; +import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryMode; +import software.amazon.awssdk.services.acm.AcmClient; +import software.amazon.awssdk.services.s3.S3Client; + +/** + * This class consist logic for downloading the SSL certificates from S3/ACM/Local file. + * + */ +public class CertificateProviderFactory { + private static final Logger LOG = LoggerFactory.getLogger(CertificateProviderFactory.class); + + final HttpSinkConfiguration httpSinkConfiguration; + public CertificateProviderFactory(final HttpSinkConfiguration httpSinkConfiguration) { + this.httpSinkConfiguration = httpSinkConfiguration; + } + + /** + * This method consist logic for downloading the SSL certificates from S3/ACM/Local file. + * @return CertificateProvider + */ + public CertificateProvider getCertificateProvider() { + if (httpSinkConfiguration.useAcmCertForSSL()) { + LOG.info("Using ACM certificate and private key for SSL/TLS."); + final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder() + .addCredentialsProvider(DefaultCredentialsProvider.create()).build(); + final ClientOverrideConfiguration clientConfig = ClientOverrideConfiguration.builder() + .retryPolicy(RetryMode.STANDARD) + .build(); + + final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws"); + + final AcmClient awsCertificateManager = AcmClient.builder() + .region(httpSinkConfiguration.getAwsAuthenticationOptions().getAwsRegion()) + .credentialsProvider(credentialsProvider) + .overrideConfiguration(clientConfig) + .overrideConfiguration(metricPublisher -> metricPublisher.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics))) + .build(); + + return new ACMCertificateProvider(awsCertificateManager, httpSinkConfiguration.getAcmCertificateArn(), + httpSinkConfiguration.getAcmCertIssueTimeOutMillis(), httpSinkConfiguration.getAcmPrivateKeyPassword()); + } else if (httpSinkConfiguration.isSslCertAndKeyFileInS3()) { + LOG.info("Using S3 to fetch certificate and private key for SSL/TLS."); + final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder() + .addCredentialsProvider(DefaultCredentialsProvider.create()).build(); + final S3Client s3Client = S3Client.builder() + .region(httpSinkConfiguration.getAwsAuthenticationOptions().getAwsRegion()) + .credentialsProvider(credentialsProvider) + .build(); + return new S3CertificateProvider(s3Client, + httpSinkConfiguration.getSslCertificateFile(), + httpSinkConfiguration.getSslKeyFile()); + } else { + LOG.info("Using local file system to get certificate and private key for SSL/TLS."); + return new FileCertificateProvider(httpSinkConfiguration.getSslCertificateFile(), httpSinkConfiguration.getSslKeyFile()); + } + } +} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/certificate/HttpClientSSLConnectionManager.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/certificate/HttpClientSSLConnectionManager.java new file mode 100644 index 0000000000..4ea5e8d06e --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/certificate/HttpClientSSLConnectionManager.java @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.certificate; + +import org.apache.hc.client5.http.config.TlsConfig; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory; +import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactoryBuilder; +import org.apache.hc.client5.http.ssl.TrustAllStrategy; +import org.apache.hc.core5.http.ssl.TLS; +import org.apache.hc.core5.ssl.SSLContextBuilder; +import org.apache.hc.core5.ssl.SSLContexts; +import org.apache.hc.core5.ssl.TrustStrategy; +import org.apache.hc.core5.util.Timeout; +import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; +import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; + +import javax.net.ssl.SSLContext; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.security.KeyStore; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; + +/** + * This class implements SSL certs authentication + * + */ +public class HttpClientSSLConnectionManager { + + /** + * This method creates HttpClientConnectionManager for SSL certs authentication + * @param sinkConfiguration HttpSinkConfiguration + * @param providerFactory CertificateProviderFactory + * @return HttpClientConnectionManager + */ + public HttpClientConnectionManager createHttpClientConnectionManager(final HttpSinkConfiguration sinkConfiguration, + final CertificateProviderFactory providerFactory){ + final CertificateProvider certificateProvider = providerFactory.getCertificateProvider(); + final org.opensearch.dataprepper.plugins.certificate.model.Certificate certificate = certificateProvider.getCertificate(); + final SSLContext sslContext = sinkConfiguration.getSslCertificateFile() != null ? + getCAStrategy(new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8))) : getTrustAllStrategy(); + SSLConnectionSocketFactory sslSocketFactory = SSLConnectionSocketFactoryBuilder.create() + .setSslContext(sslContext) + .build(); + return PoolingHttpClientConnectionManagerBuilder.create() + .setSSLSocketFactory(sslSocketFactory) + .setDefaultTlsConfig(TlsConfig.custom() + .setHandshakeTimeout(Timeout.ofSeconds(30)) + .setSupportedProtocols(TLS.V_1_3) + .build()) + .build(); + } + + private SSLContext getCAStrategy(final InputStream certificate) { + try { + CertificateFactory factory = CertificateFactory.getInstance("X.509"); + Certificate trustedCa; + trustedCa = factory.generateCertificate(certificate); + KeyStore trustStore = KeyStore.getInstance("pkcs12"); + trustStore.load(null, null); + trustStore.setCertificateEntry("ca", trustedCa); + SSLContextBuilder sslContextBuilder = SSLContexts.custom() + .loadTrustMaterial(trustStore, null); + return sslContextBuilder.build(); + } catch (Exception ex) { + throw new RuntimeException(ex.getMessage(), ex); + } + } + + private SSLContext getTrustAllStrategy() { + final TrustStrategy trustStrategy = new TrustAllStrategy(); + try { + return SSLContexts.custom().loadTrustMaterial(null, trustStrategy).build(); + } catch (Exception ex) { + throw new RuntimeException(ex.getMessage(), ex); + } + } +} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AuthTypeOptions.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AuthTypeOptions.java new file mode 100644 index 0000000000..52ce8fce20 --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AuthTypeOptions.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public enum AuthTypeOptions { + HTTP_BASIC("http_basic"), + BEARER_TOKEN("bearer_token"), + UNAUTHENTICATED("unauthenticated"); + + private static final Map OPTIONS_MAP = Arrays.stream(AuthTypeOptions.values()) + .collect(Collectors.toMap( + value -> value.option, + value -> value + )); + + private final String option; + + AuthTypeOptions(final String option) { + this.option = option; + } + + @JsonCreator + static AuthTypeOptions fromOptionValue(final String option) { + return OPTIONS_MAP.get(option); + } +} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/HTTPMethodOptions.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/HTTPMethodOptions.java new file mode 100644 index 0000000000..8d28aabd88 --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/HTTPMethodOptions.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public enum HTTPMethodOptions { + PUT("PUT"), + POST("POST"); + + private static final Map OPTIONS_MAP = Arrays.stream(HTTPMethodOptions.values()) + .collect(Collectors.toMap( + value -> value.option, + value -> value + )); + + private final String option; + + HTTPMethodOptions(final String option) { + this.option = option; + } + + @JsonCreator + static HTTPMethodOptions fromOptionValue(final String option) { + return OPTIONS_MAP.get(option); + } +} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/HttpSinkConfiguration.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/HttpSinkConfiguration.java index 6d4c7795a7..71bad98adf 100644 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/HttpSinkConfiguration.java +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/HttpSinkConfiguration.java @@ -7,18 +7,32 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; +import org.apache.commons.lang3.StringUtils; import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.plugins.accumulator.BufferTypeOptions; +import java.time.Duration; import java.util.List; public class HttpSinkConfiguration { private static final int DEFAULT_UPLOAD_RETRIES = 5; - private static final String DEFAULT_HTTP_METHOD = "POST"; - private static final int DEFAULT_WORKERS = 1; + static final boolean DEFAULT_SSL = false; + + private static final String S3_PREFIX = "s3://"; + + static final String SSL_KEY_CERT_FILE = "sslKeyCertChainFile"; + static final String SSL_KEY_FILE = "sslKeyFile"; + static final String SSL = "ssl"; + static final String AWS_REGION = "awsRegion"; + static final boolean DEFAULT_USE_ACM_CERT_FOR_SSL = false; + static final int DEFAULT_ACM_CERT_ISSUE_TIME_OUT_MILLIS = 120000; + public static final String SSL_IS_ENABLED = "%s is enabled"; + + public static final Duration DEFAULT_HTTP_RETRY_INTERVAL = Duration.ofSeconds(30); @NotNull @JsonProperty("urls") private List urlConfigurationOptions; @@ -30,19 +44,17 @@ public class HttpSinkConfiguration { private PluginModel codec; @JsonProperty("http_method") - private String httpMethod = DEFAULT_HTTP_METHOD; + private HTTPMethodOptions httpMethod = HTTPMethodOptions.POST; @JsonProperty("proxy") private String proxy; @JsonProperty("auth_type") - private String authType; + private AuthTypeOptions authType = AuthTypeOptions.UNAUTHENTICATED; + @JsonProperty("authentication") private PluginModel authentication; - @JsonProperty("insecure") - private boolean insecure; - @JsonProperty("ssl_certificate_file") private String sslCertificateFile; @@ -53,9 +65,9 @@ public class HttpSinkConfiguration { private boolean awsSigv4; @JsonProperty("buffer_type") - //private BufferTypeOptions bufferType = BufferTypeOptions.INMEMORY; - private String bufferType = "in_memory"; //TODO: change to BufferTypeOptions + private BufferTypeOptions bufferType = BufferTypeOptions.INMEMORY; + @NotNull @JsonProperty("threshold") private ThresholdOptions thresholdOptions; @@ -72,8 +84,95 @@ public class HttpSinkConfiguration { @JsonProperty("dlq_file") private String dlqFile; + @JsonProperty("webhook_url") + private String webhookURL; + + @JsonProperty("dlq") private PluginModel dlq; + @JsonProperty("use_acm_cert_for_ssl") + private boolean useAcmCertForSSL = DEFAULT_USE_ACM_CERT_FOR_SSL; + + @JsonProperty("acm_private_key_password") + private String acmPrivateKeyPassword; + + @JsonProperty("acm_certificate_arn") + private String acmCertificateArn; + + @JsonProperty("acm_cert_issue_time_out_millis") + private long acmCertIssueTimeOutMillis = DEFAULT_ACM_CERT_ISSUE_TIME_OUT_MILLIS; + + @JsonProperty("ssl") + private boolean ssl = DEFAULT_SSL; + + @JsonProperty("http_retry_interval") + private Duration httpRetryInterval = DEFAULT_HTTP_RETRY_INTERVAL; + + + private boolean sslCertAndKeyFileInS3; + + public boolean isSsl() { + return ssl; + } + + public Duration getHttpRetryInterval() { + return httpRetryInterval; + } + + public String getAcmPrivateKeyPassword() { + return acmPrivateKeyPassword; + } + + public boolean isSslCertAndKeyFileInS3() { + return sslCertAndKeyFileInS3; + } + + public long getAcmCertIssueTimeOutMillis() { + return acmCertIssueTimeOutMillis; + } + + public boolean useAcmCertForSSL() { + return useAcmCertForSSL; + } + + public String getWebhookURL() { + return webhookURL; + } + + public void validateAndInitializeCertAndKeyFileInS3() { + boolean certAndKeyFileInS3 = false; + if (useAcmCertForSSL) { + validateSSLArgument(String.format(SSL_IS_ENABLED, useAcmCertForSSL), acmCertificateArn, acmCertificateArn); + validateSSLArgument(String.format(SSL_IS_ENABLED, useAcmCertForSSL), awsAuthenticationOptions.getAwsRegion().toString(), AWS_REGION); + } else if(ssl) { + validateSSLCertificateFiles(); + certAndKeyFileInS3 = isSSLCertificateLocatedInS3(); + if (certAndKeyFileInS3) { + validateSSLArgument("The certificate and key files are located in S3", awsAuthenticationOptions.getAwsRegion().toString(), AWS_REGION); + } + } + sslCertAndKeyFileInS3 = certAndKeyFileInS3; + } + private void validateSSLArgument(final String sslTypeMessage, final String argument, final String argumentName) { + if (StringUtils.isEmpty(argument)) { + throw new IllegalArgumentException(String.format("%s, %s can not be empty or null", sslTypeMessage, argumentName)); + } + } + + private void validateSSLCertificateFiles() { + validateSSLArgument(String.format(SSL_IS_ENABLED, SSL), sslCertificateFile, SSL_KEY_CERT_FILE); + validateSSLArgument(String.format(SSL_IS_ENABLED, SSL), sslKeyFile, SSL_KEY_FILE); + } + + private boolean isSSLCertificateLocatedInS3() { + return sslCertificateFile.toLowerCase().startsWith(S3_PREFIX) && + sslKeyFile.toLowerCase().startsWith(S3_PREFIX); + } + + public String getAcmCertificateArn() { + return acmCertificateArn; + } + public List getUrlConfigurationOptions() { return urlConfigurationOptions; } @@ -82,7 +181,7 @@ public PluginModel getCodec() { return codec; } - public String getHttpMethod() { + public HTTPMethodOptions getHttpMethod() { return httpMethod; } @@ -90,7 +189,7 @@ public String getProxy() { return proxy; } - public String getAuthType() { + public AuthTypeOptions getAuthType() { return authType; } @@ -98,10 +197,6 @@ public PluginModel getAuthentication() { return authentication; } - public boolean isInsecure() { - return insecure; - } - public String getSslCertificateFile() { return sslCertificateFile; } @@ -114,7 +209,7 @@ public boolean isAwsSigv4() { return awsSigv4; } - public String getBufferType() { + public BufferTypeOptions getBufferType() { return bufferType; } diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/UrlConfigurationOption.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/UrlConfigurationOption.java index 0cfd84bdf0..4aa45a082c 100644 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/UrlConfigurationOption.java +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/UrlConfigurationOption.java @@ -12,8 +12,6 @@ public class UrlConfigurationOption { private static final int DEFAULT_WORKERS = 1; - private static final String DEFAULT_HTTP_METHOD = "POST"; - @NotNull @JsonProperty("url") private String url; @@ -28,10 +26,10 @@ public class UrlConfigurationOption { private PluginModel codec; @JsonProperty("http_method") - private String httpMethod = DEFAULT_HTTP_METHOD; + private HTTPMethodOptions httpMethod; @JsonProperty("auth_type") - private String authType; + private AuthTypeOptions authType; public String getUrl() { return url; @@ -49,11 +47,11 @@ public PluginModel getCodec() { return codec; } - public String getHttpMethod() { + public HTTPMethodOptions getHttpMethod() { return httpMethod; } - public String getAuthType() { + public AuthTypeOptions getAuthType() { return authType; } diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/BasicAuthHttpSinkHandler.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/BasicAuthHttpSinkHandler.java index c84841c31e..0406ce09f8 100644 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/BasicAuthHttpSinkHandler.java +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/BasicAuthHttpSinkHandler.java @@ -4,14 +4,41 @@ */ package org.opensearch.dataprepper.plugins.sink.handler; -import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; - -import java.util.Optional; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; +import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.opensearch.dataprepper.plugins.sink.FailedHttpResponseInterceptor; +import org.opensearch.dataprepper.plugins.sink.util.HttpSinkUtil; +/** + * * This class handles Basic Authentication + */ public class BasicAuthHttpSinkHandler implements MultiAuthHttpSinkHandler { + + private final HttpClientConnectionManager httpClientConnectionManager; + + private final String username; + + private final String password; + + public BasicAuthHttpSinkHandler(final String username, + final String password, + final HttpClientConnectionManager httpClientConnectionManager){ + this.httpClientConnectionManager = httpClientConnectionManager; + this.username = username; + this.password = password; + } + @Override - public Optional authenticate(HttpSinkConfiguration sinkConfiguration) { - // if ssl enabled then set connection manager - return null; + public HttpAuthOptions authenticate(final HttpAuthOptions.Builder httpAuthOptionsBuilder) { + final BasicCredentialsProvider provider = new BasicCredentialsProvider(); + AuthScope authScope = new AuthScope(HttpSinkUtil.getHttpHostByURL(HttpSinkUtil.getURLByUrlString(httpAuthOptionsBuilder.getUrl()))); + provider.setCredentials(authScope, new UsernamePasswordCredentials(username, password.toCharArray())); + httpAuthOptionsBuilder.setHttpClientBuilder(httpAuthOptionsBuilder.build().getHttpClientBuilder() + .setConnectionManager(httpClientConnectionManager) + .addResponseInterceptorLast(new FailedHttpResponseInterceptor(httpAuthOptionsBuilder.getUrl())) + .setDefaultCredentialsProvider(provider)); + return httpAuthOptionsBuilder.build(); } } diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/BearerTokenAuthHttpSinkHandler.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/BearerTokenAuthHttpSinkHandler.java index fec474e6b8..63d594e49d 100644 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/BearerTokenAuthHttpSinkHandler.java +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/BearerTokenAuthHttpSinkHandler.java @@ -4,14 +4,32 @@ */ package org.opensearch.dataprepper.plugins.sink.handler; -import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; - -import java.util.Optional; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.opensearch.dataprepper.plugins.sink.FailedHttpResponseInterceptor; +/** + * * This class handles Bearer Token Authentication + */ public class BearerTokenAuthHttpSinkHandler implements MultiAuthHttpSinkHandler { + + public static final String AUTHORIZATION = "Authorization"; + + private final HttpClientConnectionManager httpClientConnectionManager; + + private final String bearerTokenString; + + public BearerTokenAuthHttpSinkHandler(final String bearerTokenString, + final HttpClientConnectionManager httpClientConnectionManager){ + this.bearerTokenString = bearerTokenString; + this.httpClientConnectionManager = httpClientConnectionManager; + } + @Override - public Optional authenticate(HttpSinkConfiguration sinkConfiguration) { - // if ssl enabled then set connection manager - return null; + public HttpAuthOptions authenticate(final HttpAuthOptions.Builder httpAuthOptionsBuilder) { + httpAuthOptionsBuilder.getClassicHttpRequestBuilder().addHeader(AUTHORIZATION,bearerTokenString); + httpAuthOptionsBuilder.setHttpClientBuilder(httpAuthOptionsBuilder.build().getHttpClientBuilder() + .setConnectionManager(httpClientConnectionManager) + .addResponseInterceptorLast(new FailedHttpResponseInterceptor(httpAuthOptionsBuilder.getUrl()))); + return httpAuthOptionsBuilder.build(); } } diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/HttpAuthOptions.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/HttpAuthOptions.java index 7ff5810b77..0e8632d25a 100644 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/HttpAuthOptions.java +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/HttpAuthOptions.java @@ -4,58 +4,108 @@ */ package org.opensearch.dataprepper.plugins.sink.handler; -import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; -import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; + public class HttpAuthOptions { + private String url; - private CloseableHttpClient closeableHttpClient; - private ClassicHttpRequest classicHttpRequest; - private int workers; - private String proxy; - public CloseableHttpClient getCloseableHttpClient() { - return closeableHttpClient; - } + private HttpClientBuilder httpClientBuilder; - public HttpAuthOptions setCloseableHttpClient(CloseableHttpClient closeableHttpClient) { - this.closeableHttpClient = closeableHttpClient; - return this; - } + private ClassicRequestBuilder classicHttpRequestBuilder; + + private HttpClientConnectionManager httpClientConnectionManager; - public ClassicHttpRequest getClassicHttpRequest() { - return classicHttpRequest; + private int workers; + + private HttpHost proxy; + + public HttpClientBuilder getHttpClientBuilder() { + return httpClientBuilder; } - public HttpAuthOptions setClassicHttpRequest(ClassicHttpRequest classicHttpRequest) { - this.classicHttpRequest = classicHttpRequest; - return this; + public ClassicRequestBuilder getClassicHttpRequestBuilder() { + return classicHttpRequestBuilder; } public int getWorkers() { return workers; } - public HttpAuthOptions setWorkers(int workers) { - this.workers = workers; - return this; - } - public String getUrl() { return url; } - public HttpAuthOptions setUrl(String url) { - this.url = url; - return this; + public HttpHost getProxy() { + return proxy; } - public String getProxy() { - return proxy; + public HttpClientConnectionManager getHttpClientConnectionManager() { + return httpClientConnectionManager; } - public HttpAuthOptions setProxy(String proxy) { - this.proxy = proxy; - return this; + private HttpAuthOptions(Builder builder) { + this.url = builder.url; + this.httpClientBuilder = builder.httpClientBuilder; + this.classicHttpRequestBuilder = builder.classicHttpRequestBuilder; + this.httpClientConnectionManager = builder.httpClientConnectionManager; + this.workers = builder.workers; + this.proxy = builder.proxy; } + public static class Builder { + + private String url; + private HttpClientBuilder httpClientBuilder; + private ClassicRequestBuilder classicHttpRequestBuilder; + private HttpClientConnectionManager httpClientConnectionManager; + private int workers; + private HttpHost proxy; + + public HttpAuthOptions build() { + return new HttpAuthOptions(this); + } + + public Builder setUrl(String url) { + this.url = url; + return this; + } + + public String getUrl() { + return url; + } + + public Builder setHttpClientBuilder(HttpClientBuilder httpClientBuilder) { + this.httpClientBuilder = httpClientBuilder; + return this; + } + + public Builder setClassicHttpRequestBuilder(ClassicRequestBuilder classicHttpRequestBuilder) { + this.classicHttpRequestBuilder = classicHttpRequestBuilder; + return this; + } + + public Builder setHttpClientConnectionManager(HttpClientConnectionManager httpClientConnectionManager) { + this.httpClientConnectionManager = httpClientConnectionManager; + return this; + } + + public Builder setWorkers(int workers) { + this.workers = workers; + return this; + } + + public Builder setProxy(HttpHost proxy) { + this.proxy = proxy; + return this; + } + + public ClassicRequestBuilder getClassicHttpRequestBuilder() { + return classicHttpRequestBuilder; + } + } + } diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/MultiAuthHttpSinkHandler.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/MultiAuthHttpSinkHandler.java index e0db436915..2002610fc7 100644 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/MultiAuthHttpSinkHandler.java +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/MultiAuthHttpSinkHandler.java @@ -4,11 +4,16 @@ */ package org.opensearch.dataprepper.plugins.sink.handler; -import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; - -import java.util.Optional; - +/** + * An interface to handle multiple authentications + */ public interface MultiAuthHttpSinkHandler { - Optional authenticate(final HttpSinkConfiguration sinkConfiguration); + + /** + * This method can be used to implement multiple authentication based on configuration + * @param httpAuthOptionsBuilder HttpAuthOptions.Builder + * @return HttpAuthOptions + */ + HttpAuthOptions authenticate(final HttpAuthOptions.Builder httpAuthOptionsBuilder); } diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/SecuredAuthHttpSinkHandler.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/SecuredAuthHttpSinkHandler.java deleted file mode 100644 index 9fb58fe223..0000000000 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/SecuredAuthHttpSinkHandler.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.sink.handler; - -import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; - -import java.util.Optional; - -public class SecuredAuthHttpSinkHandler implements MultiAuthHttpSinkHandler { - @Override - public Optional authenticate(HttpSinkConfiguration sinkConfiguration) { - // logic here to read the certs from ACM/S3/local - // SSL Sigv4 validation and verification and make connection - return null; - } -} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/service/HttpSinkService.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/service/HttpSinkService.java index 8f97dbb5b0..c7a887c650 100644 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/service/HttpSinkService.java +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/service/HttpSinkService.java @@ -4,56 +4,248 @@ */ package org.opensearch.dataprepper.plugins.sink.service; -import org.opensearch.dataprepper.model.buffer.Buffer; +import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.accumulator.Buffer; import org.opensearch.dataprepper.plugins.accumulator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.FailedHttpResponseInterceptor; +import org.opensearch.dataprepper.plugins.sink.HttpEndPointResponse; +import org.opensearch.dataprepper.plugins.sink.certificate.CertificateProviderFactory; +import org.opensearch.dataprepper.plugins.sink.certificate.HttpClientSSLConnectionManager; +import org.opensearch.dataprepper.plugins.sink.configuration.AuthTypeOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.HTTPMethodOptions; import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; +import org.opensearch.dataprepper.plugins.sink.configuration.UrlConfigurationOption; +import org.opensearch.dataprepper.plugins.sink.handler.BasicAuthHttpSinkHandler; +import org.opensearch.dataprepper.plugins.sink.handler.BearerTokenAuthHttpSinkHandler; import org.opensearch.dataprepper.plugins.sink.handler.HttpAuthOptions; -import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.plugins.sink.handler.MultiAuthHttpSinkHandler; +import org.opensearch.dataprepper.plugins.sink.util.HttpSinkUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * This service class contains logic for sending data to Http Endpoints + */ public class HttpSinkService { - private final HttpSinkConfiguration httpSinkConf; + private static final Logger LOG = LoggerFactory.getLogger(HttpSinkService.class); + + public static final String USERNAME = "username"; + + public static final String PASSWORD = "password"; + + public static final String TOKEN = "token"; + + public static final String BEARER = "Bearer "; + + private final Collection bufferedEventHandles; + + private final HttpSinkConfiguration httpSinkConfiguration; private final BufferFactory bufferFactory; - private final List httpAuthOptions; - private OutputCodec codec; + private final Map httpAuthOptions; + + private final PluginSetting pluginSetting; - public HttpSinkService(final OutputCodec codec, - final HttpSinkConfiguration httpSinkConf, + private final Lock reentrantLock; + + private final HttpClientBuilder httpClientBuilder; + + private CertificateProviderFactory certificateProviderFactory; + + private HttpClientConnectionManager httpClientConnectionManager; + + private Buffer currentBuffer; + + public HttpSinkService(final HttpSinkConfiguration httpSinkConfiguration, final BufferFactory bufferFactory, - final List httpAuthOptions){ - this.codec= codec; - this.httpSinkConf = httpSinkConf; + final PluginSetting pluginSetting, + final HttpClientBuilder httpClientBuilder, + final PluginMetrics pluginMetrics){ + this.httpSinkConfiguration = httpSinkConfiguration; this.bufferFactory = bufferFactory; - this.httpAuthOptions = httpAuthOptions; - } - - public void processRecords(Collection> records) { - records.forEach(record -> { - try{ - // logic to fetch the records in batch as per threshold limit - checkThresholdExceed(); - // apply the codec - // push to http end point - }catch(Exception e){ - // In case of any exception, need to write the exception in dlq - logFailureForDlqObjects(); - // In case of any exception, need to push the web hook url- logFailureForWebHook(); + this.pluginSetting = pluginSetting; + this.reentrantLock = new ReentrantLock(); + this.bufferedEventHandles = new LinkedList<>(); + this.httpClientBuilder = httpClientBuilder; + + if (httpSinkConfiguration.isSsl() || httpSinkConfiguration.useAcmCertForSSL()) { + this.certificateProviderFactory = new CertificateProviderFactory(httpSinkConfiguration); + httpSinkConfiguration.validateAndInitializeCertAndKeyFileInS3(); + this.httpClientConnectionManager = new HttpClientSSLConnectionManager() + .createHttpClientConnectionManager(httpSinkConfiguration, certificateProviderFactory); + } + this.httpAuthOptions = buildAuthHttpSinkObjectsByConfig(httpSinkConfiguration); + } + + /** + * This method process buffer records and send to Http End points based on configured codec + * @param records Collection of Event + */ + public void output(Collection> records) { + reentrantLock.lock(); + if (currentBuffer == null) { + this.currentBuffer = bufferFactory.getBuffer(); + } + try { + records.forEach(record -> { + final Event event = record.getData(); + try { + currentBuffer.writeEvent(event.toJsonString().getBytes()); + } catch (IOException e) { + throw new RuntimeException(e); + } + if (event.getEventHandle() != null) { + this.bufferedEventHandles.add(event.getEventHandle()); + } + final List failedHttpEndPointResponses = pushToEndPoint(getCurrentBufferData(currentBuffer)); + if (!failedHttpEndPointResponses.isEmpty()) { + //TODO send to DLQ and webhook + } else { + LOG.info("data pushed to all the end points successfully"); + } + currentBuffer = bufferFactory.getBuffer(); + releaseEventHandles(Boolean.TRUE); + + }); + }finally { + reentrantLock.unlock(); + } + } + + private byte[] getCurrentBufferData(final Buffer currentBuffer) { + try { + return currentBuffer.getSinkBufferData(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void releaseEventHandles(final boolean result) { + for (EventHandle eventHandle : bufferedEventHandles) { + eventHandle.release(result); + } + bufferedEventHandles.clear(); + } + + /** + * * This method pushes bufferData to configured HttpEndPoints + * @param currentBufferData bufferData. + */ + private List pushToEndPoint(final byte[] currentBufferData) { + List httpEndPointResponses = new ArrayList<>(httpSinkConfiguration.getUrlConfigurationOptions().size()); + httpSinkConfiguration.getUrlConfigurationOptions().forEach( urlConfOption -> { + final ClassicRequestBuilder classicHttpRequestBuilder = + httpAuthOptions.get(urlConfOption.getUrl()).getClassicHttpRequestBuilder(); + classicHttpRequestBuilder.setEntity(new String(currentBufferData)); + try { + httpAuthOptions.get(urlConfOption.getUrl()).getHttpClientBuilder().build() + .execute(classicHttpRequestBuilder.build(), HttpClientContext.create()); + } catch (IOException e) { + LOG.info("No of Records failed to push endpoint {}",currentBuffer.getEventCount()); + LOG.error("Exception while pushing buffer data to end point. URL : {}, Exception : ", urlConfOption.getUrl(), e); + httpEndPointResponses.add(new HttpEndPointResponse(urlConfOption.getUrl(), HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage())); } - //TODO: implement end to end acknowledgement }); + LOG.info("No of Records successfully pushed to endpoint {}",currentBuffer.getEventCount()); + return httpEndPointResponses; + } + + /** + * * This method gets Auth Handler classes based on configuration + * @param authType AuthTypeOptions. + * @param authOptions HttpAuthOptions.Builder. + */ + private HttpAuthOptions getAuthHandlerByConfig(final AuthTypeOptions authType, + final HttpAuthOptions.Builder authOptions){ + MultiAuthHttpSinkHandler multiAuthHttpSinkHandler = null; + // TODO: AWS Sigv4 - check + switch(authType) { + case HTTP_BASIC: + String username = httpSinkConfiguration.getAuthentication().getPluginSettings().get(USERNAME).toString(); + String password = httpSinkConfiguration.getAuthentication().getPluginSettings().get(PASSWORD).toString(); + multiAuthHttpSinkHandler = new BasicAuthHttpSinkHandler(username,password,httpClientConnectionManager); + break; + case BEARER_TOKEN: + String token = httpSinkConfiguration.getAuthentication().getPluginSettings().get(TOKEN).toString(); + multiAuthHttpSinkHandler = new BearerTokenAuthHttpSinkHandler(BEARER + token,httpClientConnectionManager); + break; + case UNAUTHENTICATED: + default: + return authOptions.setHttpClientBuilder(httpClientBuilder + .setConnectionManager(httpClientConnectionManager) + .addResponseInterceptorLast(new FailedHttpResponseInterceptor(authOptions.getUrl()))).build(); + } + return multiAuthHttpSinkHandler.authenticate(authOptions); } - public static boolean checkThresholdExceed(final Buffer currentBuffer, - final int maxEvents, - final ByteCount maxBytes, - final long maxCollectionDuration) { - // logic for checking the threshold - return true; + /** + * * This method build HttpAuthOptions class based on configurations + * @param httpSinkConfiguration HttpSinkConfiguration. + */ + private Map buildAuthHttpSinkObjectsByConfig(final HttpSinkConfiguration httpSinkConfiguration){ + final List urlConfigurationOptions = httpSinkConfiguration.getUrlConfigurationOptions(); + final Map authMap = new HashMap<>(urlConfigurationOptions.size()); + urlConfigurationOptions.forEach( urlOption -> { + final HTTPMethodOptions httpMethod = Objects.nonNull(urlOption.getHttpMethod()) ? urlOption.getHttpMethod() : httpSinkConfiguration.getHttpMethod(); + final AuthTypeOptions authType = Objects.nonNull(urlOption.getAuthType()) ? urlOption.getAuthType() : httpSinkConfiguration.getAuthType(); + final String proxyUrlString = Objects.nonNull(urlOption.getProxy()) ? urlOption.getProxy() : httpSinkConfiguration.getProxy(); + final ClassicRequestBuilder classicRequestBuilder = buildRequestByHTTPMethodType(httpMethod).setUri(urlOption.getUrl()); + + if(Objects.nonNull(httpSinkConfiguration.getCustomHeaderOptions())){ + //TODO:add custom headers + } + if(Objects.nonNull(proxyUrlString)) { + httpClientBuilder.setProxy(HttpSinkUtil.getHttpHostByURL(HttpSinkUtil.getURLByUrlString(proxyUrlString))); + LOG.info("Sending data via proxy {}",proxyUrlString); + } + + final HttpAuthOptions.Builder authOptions = new HttpAuthOptions.Builder() + .setUrl(urlOption.getUrl()) + .setClassicHttpRequestBuilder(classicRequestBuilder) + .setHttpClientBuilder(httpClientBuilder); + + authMap.put(urlOption.getUrl(),getAuthHandlerByConfig(authType,authOptions)); + }); + return authMap; } -} + /** + * * builds ClassicRequestBuilder based on configured HttpMethod + * @param httpMethodOptions Http Method. + */ + private ClassicRequestBuilder buildRequestByHTTPMethodType(final HTTPMethodOptions httpMethodOptions) { + final ClassicRequestBuilder classicRequestBuilder; + switch(httpMethodOptions){ + case PUT: + classicRequestBuilder = ClassicRequestBuilder.put(); + break; + case POST: + default: + classicRequestBuilder = ClassicRequestBuilder.post(); + break; + } + return classicRequestBuilder; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/util/HttpSinkUtil.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/util/HttpSinkUtil.java new file mode 100644 index 0000000000..14bf21122a --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/util/HttpSinkUtil.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.util; + +import org.apache.hc.core5.http.HttpHost; + +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; + +public class HttpSinkUtil { + + public static URL getURLByUrlString(final String url) { + try { + return new URL(url); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + public static HttpHost getHttpHostByURL(final URL url) { + final HttpHost targetHost; + try { + targetHost = new HttpHost(url.toURI().getScheme(), url.getHost(), url.getPort()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + return targetHost; + } + + +} diff --git a/data-prepper-plugins/http-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/HttpSinkConfigurationTest.java b/data-prepper-plugins/http-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/HttpSinkConfigurationTest.java index 640877690b..e023cf4d50 100644 --- a/data-prepper-plugins/http-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/HttpSinkConfigurationTest.java +++ b/data-prepper-plugins/http-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/HttpSinkConfigurationTest.java @@ -11,12 +11,15 @@ import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.accumulator.BufferTypeOptions; import software.amazon.awssdk.regions.Region; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.opensearch.dataprepper.plugins.sink.configuration.AuthTypeOptions.HTTP_BASIC; +import static org.opensearch.dataprepper.plugins.sink.configuration.AuthTypeOptions.UNAUTHENTICATED; public class HttpSinkConfigurationTest { @@ -39,7 +42,7 @@ public class HttpSinkConfigurationTest { " password: \"vip\"\n" + " bearer_token:\n" + " token: \"\"\n" + - " insecure: false\n" + + " ssl: false\n" + " dlq_file: \"/your/local/dlq-file\"\n" + " dlq:\n" + " ssl_certificate_file: \"/full/path/to/certfile.crt\"\n" + @@ -83,12 +86,12 @@ void default_proxy_test() { @Test void default_http_method_test() { - assertThat(new HttpSinkConfiguration().getHttpMethod(), CoreMatchers.equalTo("POST")); + assertThat(new HttpSinkConfiguration().getHttpMethod(), CoreMatchers.equalTo(HTTPMethodOptions.POST)); } @Test void default_auth_type_test() { - assertNull(new HttpSinkConfiguration().getAuthType()); + assertThat(new HttpSinkConfiguration().getAuthType(), equalTo(UNAUTHENTICATED)); } @Test @@ -102,8 +105,8 @@ void get_authentication_test() { } @Test - void default_insecure_test() { - assertThat(new HttpSinkConfiguration().isInsecure(), equalTo(false)); + void default_ssl_test() { + assertThat(new HttpSinkConfiguration().isSsl(), equalTo(false)); } @Test @@ -123,7 +126,7 @@ void get_ssl_key_file_test() { @Test void default_buffer_type_test() { - assertThat(new HttpSinkConfiguration().getBufferType(), equalTo("in_memory")); + assertThat(new HttpSinkConfiguration().getBufferType(), equalTo(BufferTypeOptions.INMEMORY)); } @Test @@ -150,9 +153,9 @@ void get_custom_header_options_test() { void http_sink_pipeline_test_with_provided_config_options() throws JsonProcessingException { final HttpSinkConfiguration httpSinkConfiguration = objectMapper.readValue(SINK_YAML, HttpSinkConfiguration.class); - assertThat(httpSinkConfiguration.getHttpMethod(),equalTo("POST")); - assertThat(httpSinkConfiguration.getAuthType(),equalTo("http_basic")); - assertThat(httpSinkConfiguration.getBufferType(),equalTo("in_memory")); + assertThat(httpSinkConfiguration.getHttpMethod(),equalTo(HTTPMethodOptions.POST)); + assertThat(httpSinkConfiguration.getAuthType(),equalTo(HTTP_BASIC)); + assertThat(httpSinkConfiguration.getBufferType(),equalTo(BufferTypeOptions.INMEMORY)); assertThat(httpSinkConfiguration.getMaxUploadRetries(),equalTo(5)); assertThat(httpSinkConfiguration.getProxy(),equalTo("test-proxy")); assertThat(httpSinkConfiguration.getSslCertificateFile(),equalTo("/full/path/to/certfile.crt")); @@ -162,9 +165,9 @@ void http_sink_pipeline_test_with_provided_config_options() throws JsonProcessin final UrlConfigurationOption urlConfigurationOption = httpSinkConfiguration.getUrlConfigurationOptions().get(0); assertThat(urlConfigurationOption.getUrl(),equalTo("https://httpbin.org/post")); - assertThat(urlConfigurationOption.getHttpMethod(),equalTo("POST")); + assertThat(urlConfigurationOption.getHttpMethod(),equalTo(HTTPMethodOptions.POST)); assertThat(urlConfigurationOption.getProxy(),equalTo("test")); - assertThat(urlConfigurationOption.getAuthType(),equalTo("http_basic")); + assertThat(urlConfigurationOption.getAuthType(),equalTo(HTTP_BASIC)); final CustomHeaderOptions customHeaderOptions = httpSinkConfiguration.getCustomHeaderOptions(); diff --git a/data-prepper-plugins/http-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/UrlConfigurationOptionTest.java b/data-prepper-plugins/http-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/UrlConfigurationOptionTest.java index b28008a0a3..aa27474572 100644 --- a/data-prepper-plugins/http-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/UrlConfigurationOptionTest.java +++ b/data-prepper-plugins/http-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/UrlConfigurationOptionTest.java @@ -29,7 +29,7 @@ void default_proxy_test() { @Test void default_http_method_test() { - assertThat(new UrlConfigurationOption().getHttpMethod(),equalTo("POST")); + assertNull(new UrlConfigurationOption().getAuthType()); } @Test