Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into sqs-source-integration-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
udaych20 committed Jun 21, 2023
2 parents 10c76bd + 7649059 commit e917b7a
Show file tree
Hide file tree
Showing 108 changed files with 4,560 additions and 990 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ allprojects {
repositories {
mavenCentral()
maven { url 'https://jitpack.io' }
maven {
url 'https://packages.confluent.io/maven/'
}
}

spotless {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -309,7 +310,7 @@ public String formatString(final String format) {
String name = format.substring(position + 2, endPosition);
Object val = this.get(name, Object.class);
if (val == null) {
return null;
throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name));
}
result += val.toString();
fromIndex = endPosition + 1;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event.exceptions;

public class EventKeyNotFoundException extends RuntimeException {
public EventKeyNotFoundException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;

import java.time.Instant;
import java.util.Arrays;
Expand Down Expand Up @@ -554,7 +555,7 @@ public void testBuild_withFormatStringWithValueNotFound() {
.withData(jsonString)
.getThis()
.build();
assertThat(event.formatString("test-${boo}-string"), is(equalTo(null)));
assertThrows(EventKeyNotFoundException.class, () -> event.formatString("test-${boo}-string"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

class CredentialsCache {
private final Map<CredentialsIdentifier, AwsCredentialsProvider> credentialsProviderMap;

CredentialsCache() {
credentialsProviderMap = new HashMap<>();
credentialsProviderMap = new ConcurrentHashMap<>();
}

AwsCredentialsProvider getOrCreate(final AwsCredentialsOptions awsCredentialsOptions, final Supplier<AwsCredentialsProvider> providerSupplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,25 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;

public class GZipDecompressionEngine implements DecompressionEngine {
@Override
public InputStream createInputStream(final InputStream responseInputStream) throws IOException {
public InputStream createInputStream(final InputStream inputStream) throws IOException {
final PushbackInputStream pushbackStream = new PushbackInputStream(inputStream, 2);
final byte[] signature = new byte[2];
//read the signature
final int len = pushbackStream.read(signature);
//push back the signature to the stream
pushbackStream.unread(signature, 0, len);
//check if matches standard gzip magic number
if(!GzipCompressorInputStream.matches(signature, len)) {
throw new IOException("GZIP encoding specified but data did contain gzip magic header");
}

// We are using GzipCompressorInputStream here to decompress because GZIPInputStream doesn't decompress concatenated .gz files
// it stops after the first member and silently ignores the rest.
// It doesn't leave the read position to point to the beginning of the next member.
return new GzipCompressorInputStream(responseInputStream, true);
return new GzipCompressorInputStream(pushbackStream, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.zip.GZIPOutputStream;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -55,4 +56,20 @@ void createInputStream_with_gzip_should_return_instance_of_GZIPInputStream() thr
assertThat(inputStream, instanceOf(GzipCompressorInputStream.class));
assertThat(inputStream.readAllBytes(), equalTo(testStringBytes));
}

@Test
void createInputStream_without_gzip_should_throw_exception() throws IOException {
decompressionEngine = new GZipDecompressionEngine();

final String testString = UUID.randomUUID().toString();
final byte[] testStringBytes = testString.getBytes(StandardCharsets.UTF_8);

final ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
byteOut.write(testStringBytes, 0, testStringBytes.length);
byteOut.close();
final byte[] bites = byteOut.toByteArray();
final ByteArrayInputStream byteInStream = new ByteArrayInputStream(bites);

assertThrows(IOException.class, () -> decompressionEngine.createInputStream(byteInStream));
}
}
71 changes: 71 additions & 0 deletions data-prepper-plugins/geoip-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

dependencies {
implementation project(':data-prepper-api')
implementation project(path: ':data-prepper-plugins:common')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.apache.commons:commons-compress:1.21'

implementation 'org.mapdb:mapdb:3.0.8'
implementation 'commons-io:commons-io:2.12.0'
implementation 'software.amazon.awssdk:aws-sdk-java:2.20.67'
implementation 'software.amazon.awssdk:s3-transfer-manager'
implementation 'software.amazon.awssdk.crt:aws-crt:0.21.17'
implementation 'com.maxmind.geoip2:geoip2:4.0.1'
implementation 'com.maxmind.db:maxmind-db:3.0.0'

implementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation project(':data-prepper-test-common')
}


test {
useJUnitPlatform()
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule {
limit {
minimum = 1.0
}
}
}
}

check.dependsOn jacocoTestCoverageVerification

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
resources.srcDir file('src/integrationTest/resources')
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath

filter {
includeTestsMatching '*IT'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import java.net.MalformedURLException;
import java.util.Collection;

/**
* Implementation class of geoIP-processor plugin. It is responsible for enrichment of
* attributes for the public IPs. Supports both IPV4 and IPV6
*/
@DataPrepperPlugin(name = "geoip", pluginType = Processor.class, pluginConfigurationType = GeoIPProcessorConfig.class)
public class GeoIPProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

/**
* GeoIPProcessor constructor for initialization of required attributes
* @param pluginSetting pluginSetting
* @param geoCodingProcessorConfig geoCodingProcessorConfig
* @param pluginFactory pluginFactory
* @throws MalformedURLException MalformedURLException
*/
@DataPrepperPluginConstructor
public GeoIPProcessor(PluginSetting pluginSetting,
final GeoIPProcessorConfig geoCodingProcessorConfig,
final PluginFactory pluginFactory) throws MalformedURLException {
super(pluginSetting);
//TODO
}

/**
* Get the enriched data from the maxmind database
* @param records Input records
* @return collection of record events
*/
@Override
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {

//TODO : logic call the enrichment of data class methods
return null;
}

@Override
public void prepareForShutdown() {
//TODO
}

@Override
public boolean isReadyForShutdown() {
//TODO
return false;
}

@Override
public void shutdown() {
//TODO
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.processor.configuration.KeysConfig;
import org.opensearch.dataprepper.plugins.processor.configuration.ServiceTypeOptions;
import org.opensearch.dataprepper.plugins.processor.configuration.AwsAuthenticationOptions;

import java.util.List;

/**
* An implementation class of GeoIP Processor configuration
*/
public class GeoIPProcessorConfig {

@JsonProperty("aws")
@NotNull
@Valid
private AwsAuthenticationOptions awsAuthenticationOptions;

@JsonProperty("keys")
@NotNull
private List<KeysConfig> keysConfig;

@JsonProperty("service_type")
@NotNull
private ServiceTypeOptions serviceType;

/**
* Aws Authentication configuration Options
* @return AwsAuthenticationOptions
*/
public AwsAuthenticationOptions getAwsAuthenticationOptions() {
return awsAuthenticationOptions;
}

/**
* Lists of Source, target and attributes
* @return List of KeysConfig
*/
public List<KeysConfig> getKeysConfig() {
return keysConfig;
}

/**
* Service type Options
* @return ServiceTypeOptions
*/
public ServiceTypeOptions getServiceType() {
return serviceType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor;

import org.opensearch.dataprepper.plugins.processor.databasedownload.DBSourceOptions;
import java.net.InetAddress;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;

/**
* Implementation class of geoIP-processor plugin service class.
* It is responsible for calling of mmdb files download
*/
public class GeoIPProcessorService {

/**
* GeoIPProcessorService constructor for initialization of required attributes
* @param geoIPProcessorConfig geoIPProcessorConfig
* @param tempPath tempPath
*/
public GeoIPProcessorService(GeoIPProcessorConfig geoIPProcessorConfig, String tempPath) {
//TODO
}

/**
* Calling downlaod method abased on the database path type
* @param DBSourceOptions DBSourceOptions
*/
public void downloadThroughURLandS3(DBSourceOptions DBSourceOptions) {
//TODO
}

/**
* Method to call enrichment of data based on license type
* @param inetAddress inetAddress
* @param attributes attributes
* @param pluginStartDateTime pluginStartDateTime
* @return Enriched Map
*/
public Map<String, Object> getGeoData(InetAddress inetAddress, List<String> attributes , ZonedDateTime pluginStartDateTime) {
//TODO
return null;
}
}
Loading

0 comments on commit e917b7a

Please sign in to comment.