Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update geoip config and use extensions #3975

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public void setUp() throws JsonProcessingException {
}

public GeoIPProcessorService createObjectUnderTest() {
return new GeoIPProcessorService(geoIPProcessorConfig, tempPath);
// TODO: pass in geoIpServiceConfig object
return new GeoIPProcessorService(null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.configuration.KeysConfig;
import org.opensearch.dataprepper.plugins.processor.configuration.EntryConfig;
import org.opensearch.dataprepper.plugins.processor.databaseenrich.EnrichFailedException;
import org.opensearch.dataprepper.plugins.processor.extension.GeoIpConfigSupplier;
import org.opensearch.dataprepper.plugins.processor.utils.IPValidationcheck;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
Expand All @@ -41,29 +40,25 @@ public class GeoIPProcessor extends AbstractProcessor<Record<Event>, Record<Even
private final Counter geoIpProcessingMatchCounter;
private final Counter geoIpProcessingMismatchCounter;
private final GeoIPProcessorConfig geoIPProcessorConfig;
private final String tempPath;
private final List<String> tagsOnSourceNotFoundFailure;
private GeoIPProcessorService geoIPProcessorService;
private static final String TEMP_PATH_FOLDER = "GeoIP";

/**
* GeoIPProcessor constructor for initialization of required attributes
* @param pluginSetting pluginSetting
* @param geoCodingProcessorConfig geoCodingProcessorConfig
* @param geoIPProcessorConfig geoIPProcessorConfig
* @param geoIpConfigSupplier geoIpConfigSupplier
*/
@DataPrepperPluginConstructor
public GeoIPProcessor(PluginSetting pluginSetting,
final GeoIPProcessorConfig geoCodingProcessorConfig,
final GeoIPProcessorConfig geoIPProcessorConfig,
final GeoIpConfigSupplier geoIpConfigSupplier) {
super(pluginSetting);
this.geoIPProcessorConfig = geoCodingProcessorConfig;
this.tempPath = System.getProperty("java.io.tmpdir")+ File.separator + TEMP_PATH_FOLDER;
geoIPProcessorService = new GeoIPProcessorService(geoCodingProcessorConfig,tempPath);
tagsOnSourceNotFoundFailure = geoCodingProcessorConfig.getTagsOnSourceNotFoundFailure();
this.geoIPProcessorConfig = geoIPProcessorConfig;
this.geoIPProcessorService = geoIpConfigSupplier.getGeoIPProcessorService();
this.tagsOnSourceNotFoundFailure = geoIPProcessorConfig.getTagsOnFailure();
this.geoIpProcessingMatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MATCH);
this.geoIpProcessingMismatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MISMATCH);
// TODO: use this service and clean up MaxMind service config from pipeline.yaml
//geoIPProcessorService = geoIpConfigSupplier.getGeoIPProcessorService();
}

/**
Expand All @@ -78,17 +73,17 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {

for (final Record<Event> eventRecord : records) {
Event event = eventRecord.getData();
for (KeysConfig key : geoIPProcessorConfig.getKeysConfig()) {
String source = key.getKeyConfig().getSource();
List<String> attributes = key.getKeyConfig().getAttributes();
for (EntryConfig entry : geoIPProcessorConfig.getEntries()) {
String source = entry.getSource();
List<String> attributes = entry.getFields();
String ipAddress = event.get(source, String.class);

//Lookup from DB
if (ipAddress != null && (!(ipAddress.isEmpty()))) {
try {
if (IPValidationcheck.isPublicIpAddress(ipAddress)) {
geoData = geoIPProcessorService.getGeoData(InetAddress.getByName(ipAddress), attributes);
eventRecord.getData().put(key.getKeyConfig().getTarget(), geoData);
eventRecord.getData().put(entry.getTarget(), geoData);
geoIpProcessingMatchCounter.increment();
}
} catch (IOException | EnrichFailedException ex) {
Expand All @@ -107,16 +102,16 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {

@Override
public void prepareForShutdown() {
LOG.info("GeoIP plugin prepare For Shutdown");
}

@Override
public boolean isReadyForShutdown() {
return false;
return true;
}

@Override
public void shutdown() {
//TODO: delete mmdb files
LOG.info("GeoIP plugin Shutdown");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.processor.configuration.KeysConfig;
import org.opensearch.dataprepper.plugins.processor.configuration.ServiceTypeOptions;
import org.opensearch.dataprepper.plugins.processor.extension.AwsAuthenticationOptionsConfig;
import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.plugins.processor.configuration.EntryConfig;

import java.util.List;

Expand All @@ -19,51 +18,30 @@
*/
public class GeoIPProcessorConfig {

@JsonProperty("aws")
@JsonProperty("entries")
@NotNull
@Size(min = 1)
@Valid
private AwsAuthenticationOptionsConfig awsAuthenticationOptionsConfig;
private List<EntryConfig> entries;

@JsonProperty("keys")
@NotNull
private List<KeysConfig> keysConfig;

@JsonProperty("tags_on_source_not_found")
private List<String> tagsOnSourceNotFoundFailure;
@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

@JsonProperty("service_type")
@NotNull
private ServiceTypeOptions serviceType;

/**
* Aws Authentication configuration Options
* @return AwsAuthenticationOptions
*/
public AwsAuthenticationOptionsConfig getAwsAuthenticationOptions() {
return awsAuthenticationOptionsConfig;
}

/**
* Lists of Source, target and attributes
* @return List of KeysConfig
* Get List of entries
* @return List of EntryConfig
*/
public List<KeysConfig> getKeysConfig() {
return keysConfig;
public List<EntryConfig> getEntries() {
return entries;
}

/**
* Get the List of failure tags
* @return List of failure tags
*/
public List<String> getTagsOnSourceNotFoundFailure() {
return tagsOnSourceNotFoundFailure;
public List<String> getTagsOnFailure() {
return tagsOnFailure;
}

/**
* Service type Options
* @return ServiceTypeOptions
*/
public ServiceTypeOptions getServiceType() {
return serviceType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.dataprepper.plugins.processor;

import org.opensearch.dataprepper.plugins.processor.configuration.DatabasePathURLConfig;
import org.opensearch.dataprepper.plugins.processor.databasedownload.DBSourceOptions;
import org.opensearch.dataprepper.plugins.processor.databasedownload.LicenseTypeOptions;
import org.opensearch.dataprepper.plugins.processor.databasedownload.S3DBService;
Expand All @@ -16,6 +15,8 @@
import org.opensearch.dataprepper.plugins.processor.databaseenrich.GetGeoData;
import org.opensearch.dataprepper.plugins.processor.databaseenrich.GetGeoIP2Data;
import org.opensearch.dataprepper.plugins.processor.databaseenrich.GetGeoLite2Data;
import org.opensearch.dataprepper.plugins.processor.extension.GeoIpServiceConfig;
import org.opensearch.dataprepper.plugins.processor.extension.MaxMindConfig;
import org.opensearch.dataprepper.plugins.processor.utils.DbSourceIdentification;
import org.opensearch.dataprepper.plugins.processor.utils.LicenseTypeCheck;
import org.slf4j.Logger;
Expand All @@ -40,32 +41,35 @@ public class GeoIPProcessorService {
private static final Logger LOG = LoggerFactory.getLogger(GeoIPProcessorService.class);
public static final String DATABASE_1 = "first_database_path";
public static final String DATABASE_2 = "second_database_path";
private static final String TEMP_PATH_FOLDER = "GeoIP";
private GeoIPProcessorConfig geoIPProcessorConfig;
private LicenseTypeOptions licenseType;
private GetGeoData geoData;
private List<DatabasePathURLConfig> databasePath;
private List<String> databasePaths;
private final String tempPath;
private final ScheduledExecutorService scheduledExecutorService;
private final DBSourceOptions dbSourceOptions;
private final MaxMindConfig maxMindConfig;
public static volatile boolean downloadReady;
private boolean toggle;
private String flipDatabase;


/**
* GeoIPProcessorService constructor for initialization of required attributes
* @param geoIPProcessorConfig geoIPProcessorConfig
* @param tempPath tempPath
*
* @param geoIpServiceConfig geoIpServiceConfig
*/
public GeoIPProcessorService(GeoIPProcessorConfig geoIPProcessorConfig, String tempPath) {
public GeoIPProcessorService(final GeoIpServiceConfig geoIpServiceConfig) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is going to be provided by the extension in the end, right?

It would be good to move this into the .extension package. That could happen in a follow-on PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's provided by extension supplier. I will move it to right package.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QUES: do we know if this will potentially be used in other pipeline plugins? Need to catch up on justification why we put it in extensions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it will be used by other plugins. But the extension will allow Data Prepper to run a single GeoIP "service" which any number of processors can share.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "number of processors" are we expecting future processor plugin to depend upon this? It would be good to have examples but not blocking this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean any number of geoip processors.

pipeline-a:
  processor:
    - geoip:
    - geoip:
pipeline-b:
  processor:
    - geoip:

All of these will share the same underlying objects and resources from Maxmind.

this.toggle = false;
this.geoIPProcessorConfig = geoIPProcessorConfig;
this.tempPath = tempPath;
this.databasePath = geoIPProcessorConfig.getServiceType().getMaxMindService().getDatabasePath();
this.maxMindConfig = geoIpServiceConfig.getMaxMindConfig();
this.databasePaths = maxMindConfig.getDatabasePaths();
flipDatabase = DATABASE_1;

dbSourceOptions = DbSourceIdentification.getDatabasePathType(databasePath);
final Duration checkInterval = Objects.requireNonNull(geoIPProcessorConfig.getServiceType().getMaxMindService().getCacheRefreshSchedule());
this.tempPath = System.getProperty("java.io.tmpdir")+ File.separator + TEMP_PATH_FOLDER;

dbSourceOptions = DbSourceIdentification.getDatabasePathType(databasePaths);
final Duration checkInterval = Objects.requireNonNull(maxMindConfig.getDatabaseRefreshInterval());
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService
.scheduleAtFixedRate(this::downloadThroughURLandS3, 0L, checkInterval.toSeconds(), TimeUnit.SECONDS);
Expand All @@ -82,10 +86,10 @@ public GeoIPProcessorService(GeoIPProcessorConfig geoIPProcessorConfig, String t
String finalPath = tempPath + File.separator;
licenseType = LicenseTypeCheck.isGeoLite2OrEnterpriseLicense(finalPath.concat(flipDatabase));
if (licenseType.equals(LicenseTypeOptions.FREE)) {
geoData = new GetGeoLite2Data(finalPath.concat(flipDatabase), geoIPProcessorConfig.getServiceType().getMaxMindService().getCacheSize(), geoIPProcessorConfig);
geoData = new GetGeoLite2Data(finalPath.concat(flipDatabase), maxMindConfig.getCacheSize());
}
else if (licenseType.equals(LicenseTypeOptions.ENTERPRISE)) {
geoData = new GetGeoIP2Data(finalPath.concat(flipDatabase), geoIPProcessorConfig.getServiceType().getMaxMindService().getCacheSize(), geoIPProcessorConfig);
geoData = new GetGeoIP2Data(finalPath.concat(flipDatabase), maxMindConfig.getCacheSize());
}
}
downloadReady = false;
Expand All @@ -107,17 +111,17 @@ public synchronized void downloadThroughURLandS3() {
switch (dbSourceOptions) {
case URL:
dbSource = new HttpDBDownloadService(flipDatabase);
dbSource.initiateDownload(databasePath);
dbSource.initiateDownload(databasePaths);
downloadReady = true;
break;
case S3:
dbSource = new S3DBService(geoIPProcessorConfig, flipDatabase);
dbSource.initiateDownload(databasePath);
dbSource = new S3DBService(maxMindConfig.getAwsAuthenticationOptionsConfig(), flipDatabase);
dbSource.initiateDownload(databasePaths);
downloadReady = true;
break;
case PATH:
dbSource = new LocalDBDownloadService(tempPath, flipDatabase);
dbSource.initiateDownload(databasePath);
dbSource.initiateDownload(databasePaths);
downloadReady = true;
break;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* PDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;

import java.util.List;

public class EntryConfig {
@JsonProperty("source")
@NotEmpty
private String source;

@JsonProperty("target")
private String target;

@JsonProperty("fields")
private List<String> fields;

public String getSource() {
return source;
}

public String getTarget() {
return target;
}

public List<String> getFields() {
return fields;
}
}

This file was deleted.

Loading
Loading