Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into geoip-processor-integrati…
Browse files Browse the repository at this point in the history
…on-test
  • Loading branch information
venkataraopasyavula committed Jul 10, 2023
2 parents c2357f4 + 45b6e55 commit d4a3d31
Show file tree
Hide file tree
Showing 125 changed files with 5,926 additions and 1,005 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ subprojects {
}
} else if (details.requested.group == 'log4j' && details.requested.name == 'log4j') {
details.useTarget group: 'org.apache.logging.log4j', name: 'log4j-1.2-api', version: '2.17.1'
} else if (details.requested.group == 'org.xerial.snappy' && details.requested.name == 'snappy-java') {
details.useTarget group: 'org.xerial.snappy', name: 'snappy-java', version: '1.1.10.1'
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies {
testImplementation testLibs.junit.vintage
testImplementation project(':data-prepper-test-common')
testImplementation 'org.skyscreamer:jsonassert:1.5.1'
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation 'commons-io:commons-io:2.13.0'
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@

package org.opensearch.dataprepper.model.codec;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.sink.Sink;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;

public interface OutputCodec {

static final ObjectMapper objectMapper = new ObjectMapper();

/**
* this method get called from {@link Sink} to do initial wrapping in {@link OutputStream}
* Implementors should do initial wrapping according to the implementation
Expand All @@ -26,11 +33,12 @@ public interface OutputCodec {
* this method get called from {@link Sink} to write event in {@link OutputStream}
* Implementors should do get data from event and write to the {@link OutputStream}
*
* @param event event Record event
* @param outputStream outputStream param to hold the event data
* @param event event Record event
* @param outputStream outputStream param to hold the event data
* @param tagsTargetKey to add tags to the record
* @throws IOException throws IOException when not able to write data to {@link OutputStream}
*/
void writeEvent(Event event, OutputStream outputStream) throws IOException;
void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException;

/**
* this method get called from {@link Sink} to do final wrapping in {@link OutputStream}
Expand All @@ -47,4 +55,11 @@ public interface OutputCodec {
* @return String
*/
String getExtension();

default Event addTagsToEvent(Event event, String tagsTargetKey) throws JsonProcessingException {
String eventJsonString = event.jsonBuilder().includeTags(tagsTargetKey).toJsonString();
Map<String, Object> eventData = objectMapper.readValue(eventJsonString, new TypeReference<>() {
});
return JacksonLog.builder().withData(eventData).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,19 +373,9 @@ private String trimKey(final String key) {
}

private boolean isValidKey(final String key) {
char previous = ' ';
char next = ' ';
for (int i = 0; i < key.length(); i++) {
char c = key.charAt(i);

if (i < key.length() - 1) {
next = key.charAt(i + 1);
}

if ((i == 0 || i == key.length() - 1 || previous == '/' || next == '/') && (c == '_' || c == '.' || c == '-')) {
return false;
}

if (!(c >= 48 && c <= 57
|| c >= 65 && c <= 90
|| c >= 97 && c <= 122
Expand All @@ -397,7 +387,6 @@ private boolean isValidKey(final String key) {

return false;
}
previous = c;
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.opensearch.dataprepper.model.codec;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

import static org.junit.Assert.assertNotEquals;

public class OutputCodecTest {

@BeforeEach
public void setUp() {
}

@Test
public void testWriteMetrics() throws JsonProcessingException {
OutputCodec outputCodec = new OutputCodec() {
@Override
public void start(OutputStream outputStream) throws IOException {
}

@Override
public void writeEvent(Event event, OutputStream outputStream, String tagsTargetKey) throws IOException {
}

@Override
public void complete(OutputStream outputStream) throws IOException {
}

@Override
public String getExtension() {
return null;
}
};

final Set<String> testTags = Set.of("tag1");
final EventMetadata defaultEventMetadata = DefaultEventMetadata.builder().
withEventType(EventType.LOG.toString()).
withTags(testTags).build();
Map<String, Object> json = generateJson();
final JacksonEvent event = JacksonLog.builder().withData(json).withEventMetadata(defaultEventMetadata).build();
Event tagsToEvent = outputCodec.addTagsToEvent(event, "Tag");
assertNotEquals(event.toJsonString(), tagsToEvent.toJsonString());
}

private static Map<String, Object> generateJson() {
final Map<String, Object> jsonObject = new LinkedHashMap<>();
for (int i = 0; i < 2; i++) {
jsonObject.put(UUID.randomUUID().toString(), UUID.randomUUID().toString());
}
jsonObject.put(UUID.randomUUID().toString(), Arrays.asList(UUID.randomUUID().toString(),
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
return jsonObject;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,8 @@ public void testIsValueAList_withNull() {
}

@ParameterizedTest
@ValueSource(strings = {"", "withSpecialChars*$%", "-withPrefixDash", "\\-withEscapeChars", "\\\\/withMultipleEscapeChars",
"withDashSuffix-", "withDashSuffix-/nestedKey", "withDashPrefix/-nestedKey", "_withUnderscorePrefix", "withUnderscoreSuffix_",
".withDotPrefix", "withDotSuffix.", "with,Comma", "with:Colon", "with[Bracket", "with|Brace"})
@ValueSource(strings = {"", "withSpecialChars*$%", "\\-withEscapeChars", "\\\\/withMultipleEscapeChars",
"with,Comma", "with:Colon", "with[Bracket", "with|Brace"})
void testKey_withInvalidKey_throwsIllegalArgumentException(final String invalidKey) {
assertThrowsForKeyCheck(IllegalArgumentException.class, invalidKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public interface ArmeriaHttpAuthenticationProvider {
* Gets an authentication decorator to an Armeria {@link ServerBuilder}.
*
* @since 2.0
* @return returns authentication decorator
*/
default Optional<Function<? super HttpService, ? extends HttpService>> getAuthenticationDecorator() {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ public interface GrpcAuthenticationProvider {
/**
* Returns a {@link ServerInterceptor} that does authentication
* @since 1.2
* @return returns authentication interceptor
*/
ServerInterceptor getAuthenticationInterceptor();

/**
* Allows implementors to provide an {@link HttpService} to either intercept the HTTP request prior to validation,
* or to perform validation on the HTTP request. This may be optional, in which case it is not used.
* @since 1.5
* @return returns http authentication service
*/
default Optional<Function<? super HttpService, ? extends HttpService>> getHttpAuthenticationService() {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void start(final OutputStream outputStream) throws IOException {
}

@Override
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
public void writeEvent(final Event event, final OutputStream outputStream,final String tagsTargetKey) throws IOException {
// TODO: write event data to the outputstream
}

Expand Down
38 changes: 38 additions & 0 deletions data-prepper-plugins/cloudwatch-logs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
plugins {
id 'java'
id 'java-library'
}

repositories {
mavenCentral()
}

dependencies {
api project(':data-prepper-api')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-plugins:common')
testImplementation 'org.junit.jupiter:junit-jupiter'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'software.amazon.awssdk:cloudwatch'
implementation 'software.amazon.awssdk:cloudwatchlogs'
implementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation project(path: ':data-prepper-test-common')
testImplementation project(path: ':data-prepper-test-common')
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 0.90
}
}
}
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.opensearch.dataprepper.plugins.sink.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.regions.Region;

import java.util.Map;

/**
* AwsConfig is based on the S3-Sink AwsAuthenticationOptions
* where the configuration allows the sink to fetch Aws credentials
* and resources.
*/
public class AwsConfig {
public static int DEFAULT_CONNECTION_ATTEMPTS = 5;

@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
private String awsRegion;

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
private String awsStsRoleArn;

@JsonProperty("sts_header_overrides")
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

@JsonProperty("sts_external_id")
@Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters")
private String awsStsExternalId;

public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}

public String getAwsStsExternalId() {
return awsStsExternalId;
}

public Map<String, String> getAwsStsHeaderOverrides() {
return awsStsHeaderOverrides;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.opensearch.dataprepper.plugins.sink.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

public class CwlSinkConfig {
public static final String DEFAULT_BUFFER_TYPE = "in_memory";

@JsonProperty("aws")
@NotNull
@Valid
private AwsConfig awsConfig;

@JsonProperty("threshold")
@NotNull
private ThresholdConfig thresholdConfig;

@JsonProperty("buffer_type")
private String bufferType = DEFAULT_BUFFER_TYPE;

@JsonProperty("log_group")
@NotEmpty
@NotNull
private String logGroup;

@JsonProperty("log_stream")
@NotEmpty
@NotNull
private String logStream;

public AwsConfig getAwsConfig() {
return awsConfig;
}

public ThresholdConfig getThresholdConfig() {
return thresholdConfig;
}

public String getBufferType() {
return bufferType;
}

public String getLogGroup() {
return logGroup;
}

public String getLogStream() {
return logStream;
}
}
Loading

0 comments on commit d4a3d31

Please sign in to comment.