Skip to content

Commit

Permalink
GitHub-issue#253 : Implemented GeoIP processor functionality (opensea…
Browse files Browse the repository at this point in the history
…rch-project#2925)

* GitHub-issue#253 : Implemented GeoIP processor functionality
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality. Addressed Code review comments
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality. Addressed Code review comments
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Fixed the test-case-failed issue.

Signed-off-by: Deepak Sahu <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality. Addressed Code review comments
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality. Addressed Code review comments
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality. Addressed Code review comments
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality. Addressed Code review comments
Signed-off-by: venkataraopasyavula <[email protected]>

* GitHub-issue#253 : Implemented GeoIP processor functionality. Addressed Code review comments
Signed-off-by: venkataraopasyavula <[email protected]>

---------

Signed-off-by: Deepak Sahu <[email protected]>
Co-authored-by: Deepak Sahu <[email protected]>
  • Loading branch information
venkataraopasyavula and deepaksahu562 authored Jul 12, 2023
1 parent decccb9 commit 38c6843
Show file tree
Hide file tree
Showing 39 changed files with 2,080 additions and 161 deletions.
108 changes: 108 additions & 0 deletions data-prepper-plugins/geoip-processor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# GeoIP Processor

This is the Data Prepper GeoIP processor plugin which can enrich Data Prepper events with location information using a provided IP address.
Additionally, this plugin should be able to use either a MaxMind GeoIP Lite2 database or the GeoIP2 Commercial Licensing database.
The Data Prepper author must provide information for configuring the commercial license.


## Usages

The GeoIP processor should be configured as part of Data Prepper pipeline yaml file.

## Configuration Options

```
pipeline:
...
processor:
- geoip:
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
keys:
- key:
source: "/peer/ip"
target: "target1"
- key:
source: "/peer/ip2"
target: "target2"
attributes: ["city_name","country_name"]
service_type:
maxmind:
database_path:
- url:
load_type: "in_memory"
cache_size: 4096
cache_refresh_schedule: P30D
```

## AWS Configuration

- `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).

- `sts_role_arn` (Optional) : The AWS STS role to assume for requests to S3. which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).

## Properties Configuration

- `keys` (Required) : List of properties like source, target and attributes can be specified where the location fields are written

- `source` (Required) : source IP for which enrichment will be done. Public IP can be either IPV4 or IPV6.

- `target` (Optional) : Property used to specify the key for the enriched fields.

- `attributes` (Optional) : Used to specify the properties which are included in the enrichment of data. By default all attributes are considered.

## Service type Configuration

- `database_path` (Required) : Used to provide either S3 path, maxmind URL or local file path where the .mmdb file is available.

- `url` (Required) : Provide URL for all three S3, maxmind URL or local file path.

- `load_type` (Required) : Load type used for better performance while enrich the data. There are two type load_type are present i.e "memory_map" or "cache".

- `cache_size` (Optional) : Used to mention the cache size. Default cache size is 2MB. Cache size applicable when load_type is cache.

- `cache_refresh_schedule` (Required) : Switch the DatabaseReader when ever Refresh schedule threshold is met.

- `tags_on_source_not_found` (Optional): A `List` of `String`s that specifies the tags to be set in the event the processor fails to parse or an unknown exception occurs while parsing. This tag may be used in conditional expressions in other parts of the configuration

## Sample JSON input:

"peer" : {
"ip" : "1.2.3.4"
"host" : "example.org"
}
"status" : "success"

## Sample JSON Output:

"peer" : {
"ip" : "1.2.3.4"
"host" : "example.org"
}
"location" : {
"status" : "success"
"country" : "United States"
"city_name" : "Seattle"
"latitude" : "47.64097"
"longitude" : "122.25894"
"zip_code" : "98115"
}




## Developer Guide

This plugin is compatible with Java 11. See below

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)

The integration tests for this plugin do not run as part of the Data Prepper build.

The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:geoip-processor:integrationTest -Dtests.geoipprocessor.region=<your-aws-region> -Dtests.geoipprocessor.bucket=<your-bucket>
```
4 changes: 3 additions & 1 deletion data-prepper-plugins/geoip-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ dependencies {
implementation 'com.maxmind.db:maxmind-db:3.0.0'

implementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-test-common')
}

Expand Down Expand Up @@ -68,4 +70,4 @@ task integrationTest(type: Test) {
filter {
includeTestsMatching '*IT'
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,27 @@

package org.opensearch.dataprepper.plugins.processor;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import java.net.MalformedURLException;
import org.opensearch.dataprepper.plugins.processor.configuration.KeysConfig;
import org.opensearch.dataprepper.plugins.processor.databaseenrich.EnrichFailedException;
import org.opensearch.dataprepper.plugins.processor.utils.IPValidationcheck;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
import java.util.List;
import java.util.Map;

/**
* Implementation class of geoIP-processor plugin. It is responsible for enrichment of
Expand All @@ -23,19 +34,32 @@
@DataPrepperPlugin(name = "geoip", pluginType = Processor.class, pluginConfigurationType = GeoIPProcessorConfig.class)
public class GeoIPProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(GeoIPProcessor.class);
private static final String GEO_IP_PROCESSING_MATCH = "geoIpProcessingMatch";
private static final String GEO_IP_PROCESSING_MISMATCH = "geoIpProcessingMismatch";
private final Counter geoIpProcessingMatchCounter;
private final Counter geoIpProcessingMismatchCounter;
private final GeoIPProcessorConfig geoIPProcessorConfig;
private final String tempPath;
private final List<String> tagsOnSourceNotFoundFailure;
private GeoIPProcessorService geoIPProcessorService;
private static final String TEMP_PATH_FOLDER = "GeoIP";

/**
* GeoIPProcessor constructor for initialization of required attributes
* @param pluginSetting pluginSetting
* @param geoCodingProcessorConfig geoCodingProcessorConfig
* @param pluginFactory pluginFactory
* @throws MalformedURLException MalformedURLException
*/
@DataPrepperPluginConstructor
public GeoIPProcessor(PluginSetting pluginSetting,
final GeoIPProcessorConfig geoCodingProcessorConfig,
final PluginFactory pluginFactory) throws MalformedURLException {
final GeoIPProcessorConfig geoCodingProcessorConfig) {
super(pluginSetting);
//TODO
this.geoIPProcessorConfig = geoCodingProcessorConfig;
this.tempPath = System.getProperty("java.io.tmpdir")+ File.separator + TEMP_PATH_FOLDER;
geoIPProcessorService = new GeoIPProcessorService(geoCodingProcessorConfig,tempPath);
tagsOnSourceNotFoundFailure = geoCodingProcessorConfig.getTagsOnSourceNotFoundFailure();
this.geoIpProcessingMatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MATCH);
this.geoIpProcessingMismatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MISMATCH);
}

/**
Expand All @@ -46,23 +70,49 @@ public GeoIPProcessor(PluginSetting pluginSetting,
@Override
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {

//TODO : logic call the enrichment of data class methods
return null;
Map<String, Object> geoData;

for (final Record<Event> eventRecord : records) {
Event event = eventRecord.getData();
for (KeysConfig key : geoIPProcessorConfig.getKeysConfig()) {
String source = key.getKeyConfig().getSource();
List<String> attributes = key.getKeyConfig().getAttributes();
String ipAddress = event.get(source, String.class);

//Lookup from DB
if (ipAddress != null && (!(ipAddress.isEmpty()))) {
try {
if (IPValidationcheck.isPublicIpAddress(ipAddress)) {
geoData = geoIPProcessorService.getGeoData(InetAddress.getByName(ipAddress), attributes);
eventRecord.getData().put(key.getKeyConfig().getTarget(), geoData);
geoIpProcessingMatchCounter.increment();
}
} catch (IOException | EnrichFailedException ex) {
geoIpProcessingMismatchCounter.increment();
event.getMetadata().addTags(tagsOnSourceNotFoundFailure);
LOG.error(DataPrepperMarkers.EVENT, "Failed to get Geo data for event: [{}] for the IP address [{}]", event, ipAddress, ex);
}
} else {
//No Enrichment.
event.getMetadata().addTags(tagsOnSourceNotFoundFailure);
}
}
}
return records;
}

@Override
public void prepareForShutdown() {
//TODO
LOG.info("GeoIP plugin prepare For Shutdown");
}

@Override
public boolean isReadyForShutdown() {
//TODO
return false;
}

@Override
public void shutdown() {
//TODO
LOG.info("GeoIP plugin Shutdown");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public class GeoIPProcessorConfig {
@NotNull
private List<KeysConfig> keysConfig;

@JsonProperty("tags_on_source_not_found")
private List<String> tagsOnSourceNotFoundFailure;

@JsonProperty("service_type")
@NotNull
private ServiceTypeOptions serviceType;
Expand All @@ -48,6 +51,14 @@ public List<KeysConfig> getKeysConfig() {
return keysConfig;
}

/**
* Get the List of failure tags
* @return List of failure tags
*/
public List<String> getTagsOnSourceNotFoundFailure() {
return tagsOnSourceNotFoundFailure;
}

/**
* Service type Options
* @return ServiceTypeOptions
Expand Down
Loading

0 comments on commit 38c6843

Please sign in to comment.