Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HttpSink functionality draft PR for #874. #2981

2 changes: 1 addition & 1 deletion data-prepper-plugins/common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jacocoTestCoverageVerification {
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 0.90
minimum = 0.89
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion data-prepper-plugins/http-sink/build.gradle
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@

dependencies {
implementation project(':data-prepper-api')
implementation libs.armeria.core
implementation project(path: ':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.apache.commons:commons-compress:1.21'
implementation 'joda-time:joda-time:2.11.1'
implementation project(':data-prepper-plugins:common')
implementation project(path: ':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:apache-client'
implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:acm'
implementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2'
implementation 'software.amazon.awssdk:auth'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation project(':data-prepper-plugins:failures-common')
implementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2'
testImplementation project(':data-prepper-test-common')
}

test {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
/*
* Copyright OpenSearch Contributors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
* the License. A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.opensearch.dataprepper.plugins.sink;

import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequestInterceptor;
import org.apache.hc.core5.http.io.entity.BasicHttpEntity;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.net.URIBuilder;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.signer.Signer;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.regions.Region;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;

import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST;

/**
* An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer}
* and {@link AwsCredentialsProvider}.
*/
public final class AwsRequestSigningApacheInterceptor implements HttpRequestInterceptor {

/**
* Constant to check content-length
*/
private static final String CONTENT_LENGTH = "content-length";
/**
* Constant to check Zero content length
*/
private static final String ZERO_CONTENT_LENGTH = "0";
/**
* Constant to check if host is the endpoint
*/
private static final String HOST = "host";

/**
* The service that we're connecting to.
*/
private final String service;

/**
* The particular signer implementation.
*/
private final Signer signer;

/**
* The source of AWS credentials for signing.
*/
private final AwsCredentialsProvider awsCredentialsProvider;

/**
* The region signing region.
*/
private final Region region;

/**
*
* @param service service that we're connecting to
* @param signer particular signer implementation
* @param awsCredentialsProvider source of AWS credentials for signing
* @param region signing region
*/
public AwsRequestSigningApacheInterceptor(final String service,
final Signer signer,
final AwsCredentialsProvider awsCredentialsProvider,
final Region region) {
this.service = Objects.requireNonNull(service);
this.signer = Objects.requireNonNull(signer);
this.awsCredentialsProvider = Objects.requireNonNull(awsCredentialsProvider);
this.region = Objects.requireNonNull(region);
}

/**
*
* @param service service that we're connecting to
* @param signer particular signer implementation
* @param awsCredentialsProvider source of AWS credentials for signing
* @param region signing region
*/
public AwsRequestSigningApacheInterceptor(final String service,
final Signer signer,
final AwsCredentialsProvider awsCredentialsProvider,
final String region) {
this(service, signer, awsCredentialsProvider, Region.of(region));
}

/**
* {@inheritDoc}
*/
@Override
public void process(final HttpRequest request, final EntityDetails entity, final HttpContext context)
throws IOException {
URIBuilder uriBuilder;
try {
uriBuilder = new URIBuilder(request.getUri());
} catch (URISyntaxException e) {
throw new IOException("Invalid URI", e);
}

// Copy Apache HttpRequest to AWS Request
SdkHttpFullRequest.Builder requestBuilder = SdkHttpFullRequest.builder()
.method(SdkHttpMethod.fromValue(request.getMethod()))
.uri(buildUri(context, uriBuilder));

if (request instanceof ClassicHttpRequest) {
ClassicHttpRequest classicHttpRequest =
(ClassicHttpRequest) request;
if (classicHttpRequest.getEntity() != null) {
InputStream content = classicHttpRequest.getEntity().getContent();
requestBuilder.contentStreamProvider(() -> content);
}
}
requestBuilder.rawQueryParameters(nvpToMapParams(uriBuilder.getQueryParams()));
requestBuilder.headers(headerArrayToMap(request.getHeaders()));

ExecutionAttributes attributes = new ExecutionAttributes();
attributes.putAttribute(AwsSignerExecutionAttribute.AWS_CREDENTIALS, awsCredentialsProvider.resolveCredentials());
attributes.putAttribute(AwsSignerExecutionAttribute.SERVICE_SIGNING_NAME, service);
attributes.putAttribute(AwsSignerExecutionAttribute.SIGNING_REGION, region);

// Sign it
SdkHttpFullRequest signedRequest = signer.sign(requestBuilder.build(), attributes);

// Now copy everything back
request.setHeaders(mapToHeaderArray(signedRequest.headers()));
if (request instanceof ClassicHttpRequest) {
ClassicHttpRequest classicHttpRequest =
(ClassicHttpRequest) request;
if (classicHttpRequest.getEntity() != null) {
HttpEntity basicHttpEntity = new BasicHttpEntity(signedRequest.contentStreamProvider()
.orElseThrow(() -> new IllegalStateException("There must be content"))
.newStream(), ContentType.APPLICATION_JSON);
classicHttpRequest.setEntity(basicHttpEntity);
}
}
}

private URI buildUri(final HttpContext context, URIBuilder uriBuilder) throws IOException {
try {
HttpHost host = (HttpHost) context.getAttribute(HTTP_TARGET_HOST);

if (host != null) {
uriBuilder.setHost(host.getHostName());
uriBuilder.setScheme(host.getSchemeName());
uriBuilder.setPort(host.getPort());
}

return uriBuilder.build();
} catch (URISyntaxException e) {
throw new IOException("Invalid URI", e);
}
}

/**
*
* @param params list of HTTP query params as NameValuePairs
* @return a multimap of HTTP query params
*/
private static Map<String, List<String>> nvpToMapParams(final List<NameValuePair> params) {
Map<String, List<String>> parameterMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (NameValuePair nvp : params) {
List<String> argsList =
parameterMap.computeIfAbsent(nvp.getName(), k -> new ArrayList<>());
argsList.add(nvp.getValue());
}
return parameterMap;
}

/**
* @param headers modelled Header objects
* @return a Map of header entries
*/
private static Map<String, List<String>> headerArrayToMap(final Header[] headers) {
Map<String, List<String>> headersMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (Header header : headers) {
if (!skipHeader(header)) {
headersMap.put(header.getName(), headersMap
.getOrDefault(header.getName(),
new LinkedList<>(Collections.singletonList(header.getValue()))));
}
}
return headersMap;
}

/**
* @param header header line to check
* @return true if the given header should be excluded when signing
*/
private static boolean skipHeader(final Header header) {
return (CONTENT_LENGTH.equalsIgnoreCase(header.getName())
&& ZERO_CONTENT_LENGTH.equals(header.getValue())) // Strip Content-Length: 0
|| HOST.equalsIgnoreCase(header.getName()); // Host comes from endpoint
}

/**
* @param mapHeaders Map of header entries
* @return modelled Header objects
*/
private static Header[] mapToHeaderArray(final Map<String, List<String>> mapHeaders) {
Header[] headers = new Header[mapHeaders.size()];
int i = 0;
for (Map.Entry<String, List<String>> headerEntry : mapHeaders.entrySet()) {
for (String value : headerEntry.getValue()) {
headers[i++] = new BasicHeader(headerEntry.getKey(), value);
}
}
return headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.util.TimeValue;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
Expand All @@ -25,29 +26,40 @@
import org.opensearch.dataprepper.plugins.accumulator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.accumulator.LocalFileBufferFactory;
import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration;
import org.opensearch.dataprepper.plugins.sink.dlq.DlqPushHandler;
import org.opensearch.dataprepper.plugins.sink.service.HttpSinkService;
import org.opensearch.dataprepper.plugins.sink.service.WebhookService;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Objects;

@DataPrepperPlugin(name = "http", pluginType = Sink.class, pluginConfigurationType = HttpSinkConfiguration.class)
public class HTTPSink extends AbstractSink<Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(HTTPSink.class);

private static final String BUCKET = "bucket";
private static final String KEY_PATH = "key_path_prefix";

private WebhookService webhookService;

private volatile boolean sinkInitialized;

private final HttpSinkService httpSinkService;

private final BufferFactory bufferFactory;

private final HttpSinkService httpSinkService;
private DlqPushHandler dlqPushHandler;

@DataPrepperPluginConstructor
public HTTPSink(final PluginSetting pluginSetting,
final HttpSinkConfiguration httpSinkConfiguration,
final PluginFactory pluginFactory,
final PipelineDescription pipelineDescription) {
final PipelineDescription pipelineDescription,
final AwsCredentialsSupplier awsCredentialsSupplier) {
super(pluginSetting);
final PluginModel codecConfiguration = httpSinkConfiguration.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(),
Expand All @@ -59,18 +71,38 @@ public HTTPSink(final PluginSetting pluginSetting,
} else {
this.bufferFactory = new InMemoryBufferFactory();
}

if(httpSinkConfiguration.getDlqFile() != null)
this.dlqPushHandler = new DlqPushHandler(httpSinkConfiguration.getDlqFile(), pluginFactory,
null, null, null, null);

else if(Objects.nonNull(httpSinkConfiguration.getDlq()))
this.dlqPushHandler = new DlqPushHandler(httpSinkConfiguration.getDlqFile(), pluginFactory,
httpSinkConfiguration.getDlq().getPluginSettings().get(BUCKET).toString(), httpSinkConfiguration.getAwsAuthenticationOptions()
.getAwsStsRoleArn(), httpSinkConfiguration.getAwsAuthenticationOptions().getAwsRegion().toString(),
httpSinkConfiguration.getDlq().getPluginSettings().get(KEY_PATH).toString());


final HttpRequestRetryStrategy httpRequestRetryStrategy = new DefaultHttpRequestRetryStrategy(httpSinkConfiguration.getMaxUploadRetries(),
TimeValue.of(httpSinkConfiguration.getHttpRetryInterval()));

final HttpClientBuilder httpClientBuilder = HttpClients.custom()
.setRetryStrategy(httpRequestRetryStrategy);

if(Objects.nonNull(httpSinkConfiguration.getWebhookURL()))
this.webhookService = new WebhookService(httpSinkConfiguration.getWebhookURL(),
httpClientBuilder,pluginMetrics,httpSinkConfiguration);

this.httpSinkService = new HttpSinkService(
httpSinkConfiguration,
bufferFactory,
dlqPushHandler,
codecPluginSettings,
webhookService,
httpClientBuilder,
pluginMetrics);
pluginMetrics,
awsCredentialsSupplier,
pluginSetting);
}

@Override
Expand Down
Loading
Loading