Skip to content

Commit

Permalink
Added when condition and fixed building reader on each event (opensea…
Browse files Browse the repository at this point in the history
…rch-project#4002)

* Added when condition and fixed building reader on each event

Signed-off-by: Asif Sohail Mohammed <[email protected]>
  • Loading branch information
asifsmohammed authored Jan 29, 2024
1 parent 2be8166 commit 84dcca0
Show file tree
Hide file tree
Showing 40 changed files with 415 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.plugins.processor.utils.IPValidationcheck;
import org.opensearch.dataprepper.plugins.processor.extension.GeoIPProcessorService;
import org.opensearch.dataprepper.plugins.processor.utils.IPValidationCheck;

import java.io.File;
import java.net.InetAddress;
Expand Down Expand Up @@ -89,7 +90,7 @@ void verify_enrichment_of_data_from_maxmind_url() throws UnknownHostException {
Map<String, Object> geoData = new HashMap<>();
this.geoIPProcessorService = createObjectUnderTest();
String ipAddress = geoIPInputJson.getPeer().getIp();
if (IPValidationcheck.isPublicIpAddress(ipAddress)) {
if (IPValidationCheck.isPublicIpAddress(ipAddress)) {
InetAddress inetAddress = InetAddress.getByName(ipAddress);
//All attributes are considered by default with the null value
geoData = geoIPProcessorService.getGeoData(inetAddress, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@
package org.opensearch.dataprepper.plugins.processor;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.configuration.EntryConfig;
import org.opensearch.dataprepper.plugins.processor.databaseenrich.EnrichFailedException;
import org.opensearch.dataprepper.plugins.processor.extension.GeoIPProcessorService;
import org.opensearch.dataprepper.plugins.processor.extension.GeoIpConfigSupplier;
import org.opensearch.dataprepper.plugins.processor.utils.IPValidationcheck;
import org.opensearch.dataprepper.plugins.processor.utils.IPValidationCheck;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,28 +37,34 @@
public class GeoIPProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(GeoIPProcessor.class);
private static final String GEO_IP_PROCESSING_MATCH = "geoIpProcessingMatch";
private static final String GEO_IP_PROCESSING_MISMATCH = "geoIpProcessingMismatch";
//TODO: rename metrics
static final String GEO_IP_PROCESSING_MATCH = "geoIpProcessingMatch";
static final String GEO_IP_PROCESSING_MISMATCH = "geoIpProcessingMismatch";
private final Counter geoIpProcessingMatchCounter;
private final Counter geoIpProcessingMismatchCounter;
private final GeoIPProcessorConfig geoIPProcessorConfig;
private final List<String> tagsOnSourceNotFoundFailure;
private GeoIPProcessorService geoIPProcessorService;
private final List<String> tagsOnFailure;
private final GeoIPProcessorService geoIPProcessorService;
private final String whenCondition;
private final ExpressionEvaluator expressionEvaluator;

/**
* GeoIPProcessor constructor for initialization of required attributes
* @param pluginSetting pluginSetting
* @param pluginMetrics pluginMetrics
* @param geoIPProcessorConfig geoIPProcessorConfig
* @param geoIpConfigSupplier geoIpConfigSupplier
*/
@DataPrepperPluginConstructor
public GeoIPProcessor(PluginSetting pluginSetting,
public GeoIPProcessor(final PluginMetrics pluginMetrics,
final GeoIPProcessorConfig geoIPProcessorConfig,
final GeoIpConfigSupplier geoIpConfigSupplier) {
super(pluginSetting);
final GeoIpConfigSupplier geoIpConfigSupplier,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.geoIPProcessorConfig = geoIPProcessorConfig;
this.geoIPProcessorService = geoIpConfigSupplier.getGeoIPProcessorService();
this.tagsOnSourceNotFoundFailure = geoIPProcessorConfig.getTagsOnFailure();
this.tagsOnFailure = geoIPProcessorConfig.getTagsOnFailure();
this.whenCondition = geoIPProcessorConfig.getWhenCondition();
this.expressionEvaluator = expressionEvaluator;
this.geoIpProcessingMatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MATCH);
this.geoIpProcessingMismatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MISMATCH);
}
Expand All @@ -67,33 +75,35 @@ public GeoIPProcessor(PluginSetting pluginSetting,
* @return collection of record events
*/
@Override
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {

public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
Map<String, Object> geoData;

for (final Record<Event> eventRecord : records) {
Event event = eventRecord.getData();
final Event event = eventRecord.getData();
if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) {
continue;
}
for (EntryConfig entry : geoIPProcessorConfig.getEntries()) {
String source = entry.getSource();
List<String> attributes = entry.getFields();
String ipAddress = event.get(source, String.class);
final String source = entry.getSource();
final List<String> attributes = entry.getFields();
final String ipAddress = event.get(source, String.class);

//Lookup from DB
if (ipAddress != null && (!(ipAddress.isEmpty()))) {
if (ipAddress != null && !ipAddress.isEmpty()) {
try {
if (IPValidationcheck.isPublicIpAddress(ipAddress)) {
if (IPValidationCheck.isPublicIpAddress(ipAddress)) {
geoData = geoIPProcessorService.getGeoData(InetAddress.getByName(ipAddress), attributes);
eventRecord.getData().put(entry.getTarget(), geoData);
geoIpProcessingMatchCounter.increment();
}
} catch (IOException | EnrichFailedException ex) {
} catch (final IOException | EnrichFailedException ex) {
geoIpProcessingMismatchCounter.increment();
event.getMetadata().addTags(tagsOnSourceNotFoundFailure);
LOG.error(DataPrepperMarkers.EVENT, "Failed to get Geo data for event: [{}] for the IP address [{}]", event, ipAddress, ex);
event.getMetadata().addTags(tagsOnFailure);
LOG.error(DataPrepperMarkers.EVENT, "Failed to get Geo data for event: [{}] for the IP address [{}]", event, ipAddress, ex);
}
} else {
//No Enrichment.
event.getMetadata().addTags(tagsOnSourceNotFoundFailure);
event.getMetadata().addTags(tagsOnFailure);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class GeoIPProcessorConfig {
@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

@JsonProperty("geoip_when")
private String whenCondition;


/**
* Get List of entries
Expand All @@ -44,4 +47,11 @@ public List<String> getTagsOnFailure() {
return tagsOnFailure;
}

/**
* Get when condition
* @return String When condition
*/
public String getWhenCondition() {
return whenCondition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
import java.util.List;

public class EntryConfig {
static final String DEFAULT_TARGET = "geolocation";
@JsonProperty("source")
@NotEmpty
private String source;

@JsonProperty("target")
private String target;
private String target = DEFAULT_TARGET;

@JsonProperty("fields")
private List<String> fields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ public interface GetGeoData {
public final String GeoLite2AsnDB = "GeoLite2-ASN.mmdb";
public final String GeoIP2EnterpriseDB = "GeoIP2-Enterprise.mmdb";

public void switchDatabaseReader();
public void closeReader();
public Map<String, Object> getGeoData(InetAddress inetAddress, List<String> attributes, String tempDestDir);
void switchDatabaseReader();
void closeReader();
Map<String, Object> getGeoData(InetAddress inetAddress, List<String> attributes, String tempDestDir);

/**
* Enrich attributes
* @param geoData geoData
* @param attributeName attributeName
* @param attributeValue attributeValue
*/
public default void enrichData(Map<String, Object> geoData,String attributeName, String attributeValue) {
default void enrichData(final Map<String, Object> geoData, final String attributeName, final String attributeValue) {
if (attributeValue != null) {
geoData.put(attributeName, attributeValue);
}
Expand All @@ -42,7 +42,7 @@ public default void enrichData(Map<String, Object> geoData,String attributeName,
* @param countryIso countryIso
* @param subdivisionIso subdivisionIso
*/
public default void enrichRegionIsoCode(Map<String, Object> geoData, String countryIso, String subdivisionIso) {
default void enrichRegionIsoCode(final Map<String, Object> geoData, final String countryIso, final String subdivisionIso) {
if (countryIso != null && subdivisionIso != null) {
enrichData(geoData, "region_iso_code", countryIso + "-" + subdivisionIso);
}
Expand All @@ -54,7 +54,7 @@ public default void enrichRegionIsoCode(Map<String, Object> geoData, String coun
* @param latitude latitude
* @param longitude longitude
*/
public default void enrichLocationData(Map<String, Object> geoData, Double latitude, Double longitude) {
default void enrichLocationData(final Map<String, Object> geoData, final Double latitude, final Double longitude) {
if (latitude != null && longitude != null) {
Map<String, Object> locationObject = new HashMap<>();
locationObject.put("lat", latitude);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
import com.maxmind.geoip2.record.Subdivision;
import com.maxmind.geoip2.record.Location;
import com.maxmind.geoip2.record.Postal;
import org.opensearch.dataprepper.plugins.processor.GeoIPProcessorService;
import org.opensearch.dataprepper.plugins.processor.databasedownload.DatabaseReaderCreate;
import org.opensearch.dataprepper.plugins.processor.databasedownload.DBSource;
import org.opensearch.dataprepper.plugins.processor.extension.GeoIPProcessorService;
import org.opensearch.dataprepper.plugins.processor.extension.databasedownload.DatabaseReaderCreate;
import org.opensearch.dataprepper.plugins.processor.extension.databasedownload.DBSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -44,7 +44,7 @@ public class GetGeoIP2Data implements GetGeoData {
public static final String TIMEZONE = "timezone";
public static final String LOCATION = "location";
public static final String POSTAL = "postal";
private DatabaseReader.Builder readerEnterprise;
private DatabaseReader readerEnterprise;
private Country country;
private Continent continent;
private City city;
Expand All @@ -70,15 +70,19 @@ public GetGeoIP2Data(final String dbPath, final int cacheSize) {
* Initialise all the DatabaseReader
*/
public void initDatabaseReader() {
readerEnterprise = DatabaseReaderCreate.createLoader(Path.of(dbPath + File.separator + tempDestDir + File.separator + GeoIP2EnterpriseDB), cacheSize);
try {
readerEnterprise = DatabaseReaderCreate.createLoader(Path.of(dbPath + File.separator + tempDestDir + File.separator + GeoIP2EnterpriseDB), cacheSize);
} catch (final IOException ex) {
LOG.error("Exception while creating GeoIP2 DatabaseReader: {0}", ex);
}
}

/**
* Switch all the DatabaseReader
*/
@Override
public void switchDatabaseReader() {
LOG.info("Switch DatabaseReader");
LOG.info("Switching GeoIP2 DatabaseReader");
closeReader();
System.gc();
File file = new File(dbPath);
Expand All @@ -100,7 +104,7 @@ public Map<String, Object> getGeoData(InetAddress inetAddress, List<String> attr
switchDatabaseReader();
}
try {
EnterpriseResponse enterpriseResponse = readerEnterprise.build().enterprise(inetAddress);
EnterpriseResponse enterpriseResponse = readerEnterprise.enterprise(inetAddress);
country = enterpriseResponse.getCountry();
subdivision = enterpriseResponse.getMostSpecificSubdivision();
city = enterpriseResponse.getCity();
Expand Down Expand Up @@ -179,7 +183,7 @@ public Map<String, Object> getGeoData(InetAddress inetAddress, List<String> attr
public void closeReader() {
try {
if (readerEnterprise != null)
readerEnterprise.build().close();
readerEnterprise.close();
} catch (IOException ex) {
LOG.info("Close Enterprise DatabaseReader Exception : {0}", ex);
}
Expand Down
Loading

0 comments on commit 84dcca0

Please sign in to comment.