Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into transform-key
Browse files Browse the repository at this point in the history
  • Loading branch information
shenkw1 committed Jul 5, 2023
2 parents 204817b + 75fa735 commit 8bd192d
Show file tree
Hide file tree
Showing 109 changed files with 4,622 additions and 968 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ subprojects {
}
} else if (details.requested.group == 'log4j' && details.requested.name == 'log4j') {
details.useTarget group: 'org.apache.logging.log4j', name: 'log4j-1.2-api', version: '2.17.1'
} else if (details.requested.group == 'org.xerial.snappy' && details.requested.name == 'snappy-java') {
details.useTarget group: 'org.xerial.snappy', name: 'snappy-java', version: '1.1.10.1'
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies {
testImplementation testLibs.junit.vintage
testImplementation project(':data-prepper-test-common')
testImplementation 'org.skyscreamer:jsonassert:1.5.1'
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation 'commons-io:commons-io:2.13.0'
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@

package org.opensearch.dataprepper.model.codec;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.sink.Sink;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;

public interface OutputCodec {

static final ObjectMapper objectMapper = new ObjectMapper();

/**
* this method get called from {@link Sink} to do initial wrapping in {@link OutputStream}
* Implementors should do initial wrapping according to the implementation
Expand All @@ -26,11 +33,12 @@ public interface OutputCodec {
* this method get called from {@link Sink} to write event in {@link OutputStream}
* Implementors should do get data from event and write to the {@link OutputStream}
*
* @param event event Record event
* @param outputStream outputStream param to hold the event data
* @param event event Record event
* @param outputStream outputStream param to hold the event data
* @param tagsTargetKey to add tags to the record
* @throws IOException throws IOException when not able to write data to {@link OutputStream}
*/
void writeEvent(Event event, OutputStream outputStream) throws IOException;
void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException;

/**
* this method get called from {@link Sink} to do final wrapping in {@link OutputStream}
Expand All @@ -47,4 +55,11 @@ public interface OutputCodec {
* @return String
*/
String getExtension();

default Event addTagsToEvent(Event event, String tagsTargetKey) throws JsonProcessingException {
String eventJsonString = event.jsonBuilder().includeTags(tagsTargetKey).toJsonString();
Map<String, Object> eventData = objectMapper.readValue(eventJsonString, new TypeReference<>() {
});
return JacksonLog.builder().withData(eventData).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.opensearch.dataprepper.model.codec;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

import static org.junit.Assert.assertNotEquals;

public class OutputCodecTest {

@BeforeEach
public void setUp() {
}

@Test
public void testWriteMetrics() throws JsonProcessingException {
OutputCodec outputCodec = new OutputCodec() {
@Override
public void start(OutputStream outputStream) throws IOException {
}

@Override
public void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException {
}

@Override
public void complete(OutputStream outputStream) throws IOException {
}

@Override
public String getExtension() {
return null;
}
};

final Set<String> testTags = Set.of("tag1");
final EventMetadata defaultEventMetadata = DefaultEventMetadata.builder().
withEventType(EventType.LOG.toString()).
withTags(testTags).build();
Map<String, Object> json = generateJson();
final JacksonEvent event = JacksonLog.builder().withData(json).withEventMetadata(defaultEventMetadata).build();
Event tagsToEvent = outputCodec.addTagsToEvent(event, "Tag");
assertNotEquals(event.toJsonString(), tagsToEvent.toJsonString());
}

private static Map<String, Object> generateJson() {
final Map<String, Object> jsonObject = new LinkedHashMap<>();
for (int i = 0; i < 2; i++) {
jsonObject.put(UUID.randomUUID().toString(), UUID.randomUUID().toString());
}
jsonObject.put(UUID.randomUUID().toString(), Arrays.asList(UUID.randomUUID().toString(),
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
return jsonObject;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public interface ArmeriaHttpAuthenticationProvider {
* Gets an authentication decorator to an Armeria {@link ServerBuilder}.
*
* @since 2.0
* @return returns authentication decorator
*/
default Optional<Function<? super HttpService, ? extends HttpService>> getAuthenticationDecorator() {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ public interface GrpcAuthenticationProvider {
/**
* Returns a {@link ServerInterceptor} that does authentication
* @since 1.2
* @return returns authentication interceptor
*/
ServerInterceptor getAuthenticationInterceptor();

/**
* Allows implementors to provide an {@link HttpService} to either intercept the HTTP request prior to validation,
* or to perform validation on the HTTP request. This may be optional, in which case it is not used.
* @since 1.5
* @return returns http authentication service
*/
default Optional<Function<? super HttpService, ? extends HttpService>> getHttpAuthenticationService() {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void start(final OutputStream outputStream) throws IOException {
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
public void writeEvent(final Event event, final OutputStream outputStream,final String tagsTargetKey) throws IOException {
// TODO: write event data to the outputstream
}

Expand Down
38 changes: 38 additions & 0 deletions data-prepper-plugins/cloudwatch-logs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
plugins {
id 'java'
id 'java-library'
}

repositories {
mavenCentral()
}

dependencies {
api project(':data-prepper-api')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-plugins:common')
testImplementation 'org.junit.jupiter:junit-jupiter'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'software.amazon.awssdk:cloudwatch'
implementation 'software.amazon.awssdk:cloudwatchlogs'
implementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation project(path: ':data-prepper-test-common')
testImplementation project(path: ':data-prepper-test-common')
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 0.90
}
}
}
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.opensearch.dataprepper.plugins.sink.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.regions.Region;

import java.util.Map;

/**
* AwsConfig is based on the S3-Sink AwsAuthenticationOptions
* where the configuration allows the sink to fetch Aws credentials
* and resources.
*/
public class AwsConfig {
public static int DEFAULT_CONNECTION_ATTEMPTS = 5;

@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
private String awsRegion;

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
private String awsStsRoleArn;

@JsonProperty("sts_header_overrides")
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

@JsonProperty("sts_external_id")
@Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters")
private String awsStsExternalId;

public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}

public String getAwsStsExternalId() {
return awsStsExternalId;
}

public Map<String, String> getAwsStsHeaderOverrides() {
return awsStsHeaderOverrides;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.opensearch.dataprepper.plugins.sink.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

public class CwlSinkConfig {
public static final String DEFAULT_BUFFER_TYPE = "in_memory";

@JsonProperty("aws")
@NotNull
@Valid
private AwsConfig awsConfig;

@JsonProperty("threshold")
@NotNull
private ThresholdConfig thresholdConfig;

@JsonProperty("buffer_type")
private String bufferType = DEFAULT_BUFFER_TYPE;

@JsonProperty("log_group")
@NotEmpty
@NotNull
private String logGroup;

@JsonProperty("log_stream")
@NotEmpty
@NotNull
private String logStream;

public AwsConfig getAwsConfig() {
return awsConfig;
}

public ThresholdConfig getThresholdConfig() {
return thresholdConfig;
}

public String getBufferType() {
return bufferType;
}

public String getLogGroup() {
return logGroup;
}

public String getLogStream() {
return logStream;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.opensearch.dataprepper.plugins.sink.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;

/**
* The threshold config holds the different configurations for
* buffer restrictions, retransmission restrictions and timeout
* restrictions.
*/
public class ThresholdConfig {
public static final int DEFAULT_BATCH_SIZE = 100;
public static final int DEFAULT_EVENT_SIZE = 50;
public static final int DEFAULT_SIZE_OF_REQUEST = 524288;
public static final int DEFAULT_RETRY_COUNT = 5;
public static final int DEFAULT_LOG_SEND_INTERVAL_TIME = 60;
public static final int DEFAULT_BACKOFF_TIME = 5000;

@JsonProperty("batch_size")
@Size(min = 1, max = 10000, message = "batch_size amount should be between 1 to 10000")
private int batchSize = DEFAULT_BATCH_SIZE;

@JsonProperty("max_event_size")
@Size(min = 1, max = 256, message = "max_event_size amount should be between 1 to 256 kilobytes")
private int maxEventSize = DEFAULT_EVENT_SIZE;

@JsonProperty("max_request_size")
@Size(min = 1, max = 1048576, message = "max_batch_request_size amount should be between 1 and 1048576 bytes")
private int maxRequestSize = DEFAULT_SIZE_OF_REQUEST;

@JsonProperty("retry_count")
@Size(min = 1, max = 15, message = "retry_count amount should be between 1 and 15")
private int retryCount = DEFAULT_RETRY_COUNT;

@JsonProperty("log_send_interval")
@Size(min = 5, max = 300, message = "log_send_interval amount should be between 5 and 300 seconds")
private int logSendInterval = DEFAULT_LOG_SEND_INTERVAL_TIME;

@JsonProperty("back_off_time")
@Size(min = 500, max = 1000, message = "back_off_time amount should be between 500 and 1000 milliseconds")
private int backOffTime = DEFAULT_BACKOFF_TIME;

public int getBatchSize() {
return batchSize;
}

public int getMaxEventSize() {
return maxEventSize;
}

public int getMaxRequestSize() {
return maxRequestSize;
}

public int getRetryCount() {
return retryCount;
}

public int getLogSendInterval() {
return logSendInterval;
}

public int getBackOffTime() {
return backOffTime;
}
}
Loading

0 comments on commit 8bd192d

Please sign in to comment.