Skip to content

Commit

Permalink
Addressed feedback and updated supplier interface
Browse files Browse the repository at this point in the history
Signed-off-by: Asif Sohail Mohammed <[email protected]>
  • Loading branch information
asifsmohammed committed Jan 16, 2024
1 parent be8469d commit e6f7964
Show file tree
Hide file tree
Showing 15 changed files with 123 additions and 91 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/geoip-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
implementation 'software.amazon.awssdk.crt:aws-crt:0.21.17'
implementation 'com.maxmind.geoip2:geoip2:4.0.1'
implementation 'com.maxmind.db:maxmind-db:3.0.0'
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'

implementation libs.commons.lang3
testImplementation project(':data-prepper-core')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.dataprepper.plugins.processor.configuration.KeysConfig;
import org.opensearch.dataprepper.plugins.processor.databaseenrich.EnrichFailedException;
import org.opensearch.dataprepper.plugins.processor.extension.GeoIpConfigSupplier;
import org.opensearch.dataprepper.plugins.processor.extension.GeoIpServiceConfig;
import org.opensearch.dataprepper.plugins.processor.utils.IPValidationcheck;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.slf4j.Logger;
Expand Down Expand Up @@ -44,7 +43,6 @@ public class GeoIPProcessor extends AbstractProcessor<Record<Event>, Record<Even
private final GeoIPProcessorConfig geoIPProcessorConfig;
private final String tempPath;
private final List<String> tagsOnSourceNotFoundFailure;
private final GeoIpServiceConfig geoIpServiceConfig;
private GeoIPProcessorService geoIPProcessorService;
private static final String TEMP_PATH_FOLDER = "GeoIP";

Expand All @@ -64,8 +62,8 @@ public GeoIPProcessor(PluginSetting pluginSetting,
tagsOnSourceNotFoundFailure = geoCodingProcessorConfig.getTagsOnSourceNotFoundFailure();
this.geoIpProcessingMatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MATCH);
this.geoIpProcessingMismatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MISMATCH);
// TODO: use this config and clean up MaxMind service config from pipeline.yaml
geoIpServiceConfig = geoIpConfigSupplier.getGeoIpServiceConfig();
// TODO: use this service and clean up MaxMind service config from pipeline.yaml
//geoIPProcessorService = geoIpConfigSupplier.getGeoIPProcessorService();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.processor.configuration.KeysConfig;
import org.opensearch.dataprepper.plugins.processor.configuration.ServiceTypeOptions;
import org.opensearch.dataprepper.plugins.processor.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.processor.extension.AwsAuthenticationOptionsConfig;

import java.util.List;

Expand All @@ -22,7 +22,7 @@ public class GeoIPProcessorConfig {
@JsonProperty("aws")
@NotNull
@Valid
private AwsAuthenticationOptions awsAuthenticationOptions;
private AwsAuthenticationOptionsConfig awsAuthenticationOptionsConfig;

@JsonProperty("keys")
@NotNull
Expand All @@ -39,8 +39,8 @@ public class GeoIPProcessorConfig {
* Aws Authentication configuration Options
* @return AwsAuthenticationOptions
*/
public AwsAuthenticationOptions getAwsAuthenticationOptions() {
return awsAuthenticationOptions;
public AwsAuthenticationOptionsConfig getAwsAuthenticationOptions() {
return awsAuthenticationOptionsConfig;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
* PDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.configuration;
package org.opensearch.dataprepper.plugins.processor.extension;

import java.util.Map;
import java.util.UUID;
Expand All @@ -20,8 +20,7 @@
/**
* An implementation class AWS Authentication configuration
*/
public class AwsAuthenticationOptions {

public class AwsAuthenticationOptionsConfig {
@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
private String awsRegion;
Expand Down Expand Up @@ -66,8 +65,10 @@ public AwsCredentialsProvider authenticateAwsConfiguration() {
configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader));
}

awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder().stsClient(stsClient)
.refreshRequest(assumeRoleRequestBuilder.build()).build();
awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder()
.stsClient(stsClient)
.refreshRequest(assumeRoleRequestBuilder.build())
.build();

} else {
awsCredentialsProvider = DefaultCredentialsProvider.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.processor.extension;

import org.opensearch.dataprepper.plugins.processor.GeoIPProcessorService;

public class DefaultGeoIpConfigSupplier implements GeoIpConfigSupplier {
private final GeoIpServiceConfig geoIpServiceConfig;

Expand All @@ -13,7 +15,8 @@ public DefaultGeoIpConfigSupplier(final GeoIpServiceConfig geoIpServiceConfig) {
}

@Override
public GeoIpServiceConfig getGeoIpServiceConfig() {
return this.geoIpServiceConfig;
public GeoIPProcessorService getGeoIPProcessorService() {
//TODO: use GeoIpServiceConfig and return GeoIPProcessorService
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@

package org.opensearch.dataprepper.plugins.processor.extension;

import org.opensearch.dataprepper.plugins.processor.GeoIPProcessorService;

/**
* Interface for supplying {@link GeoIPProcessorService} to {@link GeoIpConfigExtension}
*
* @since 2.7
*/
public interface GeoIpConfigSupplier {
GeoIpServiceConfig getGeoIpServiceConfig();
/**
* Returns the {@link GeoIPProcessorService}
*
* @since 2.7
*/
GeoIPProcessorService getGeoIPProcessorService();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@
package org.opensearch.dataprepper.plugins.processor.extension;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Min;
import org.hibernate.validator.constraints.time.DurationMax;
import org.hibernate.validator.constraints.time.DurationMin;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public class MaxMindConfig {
private static final String S3_PREFIX = "s3://";

//TODO: Add validations to database paths
//TODO: Make default path to be a public CDN endpoint
private static final List<String> DEFAULT_DATABASE_PATHS = new ArrayList<>();
Expand All @@ -22,21 +27,34 @@ public class MaxMindConfig {

@JsonProperty("database_paths")
private List<String> databasePaths = DEFAULT_DATABASE_PATHS;

@JsonProperty("database_refresh_interval")
@DurationMin(days = 1)
@DurationMax(days = 30)
private Duration databaseRefreshInterval = DEFAULT_DATABASE_REFRESH_INTERVAL;

@JsonProperty("cache_size")
@Min(1)
//TODO: Add a Max limit on cache size
private int cacheSize = DEFAULT_CACHE_SIZE;

//TODO: Add a destination path to store database files
@JsonProperty("aws")
@Valid
private AwsAuthenticationOptionsConfig awsAuthenticationOptionsConfig;

public MaxMindConfig() {
// This default constructor is used if maxmind is not configured
}

@AssertTrue(message = "database_refresh_interval should be between 1 and 30 days.")
public boolean isValidDatabaseRefreshInterval() {
return databaseRefreshInterval.toDays() >= 1 && databaseRefreshInterval.toDays() <= 30;
@AssertTrue(message = "aws should be configured if any path in database_paths is S3 bucket path.")
boolean isAwsAuthenticationOptionsRequired() {
for (final String databasePath : databasePaths) {
if (databasePath.startsWith(S3_PREFIX)) {
return awsAuthenticationOptionsConfig != null;
}
}
return true;
}

/**
Expand Down Expand Up @@ -68,4 +86,14 @@ public Duration getDatabaseRefreshInterval() {
public int getCacheSize() {
return cacheSize;
}

/**
* Gets the AWS authentication config used for reading from S3 bucket
*
* @return The AWS authentication options
* @since 2.7
*/
public AwsAuthenticationOptionsConfig getAwsAuthenticationOptionsConfig() {
return awsAuthenticationOptionsConfig;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
* PDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor;
Expand All @@ -9,7 +9,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.plugins.processor.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.processor.configuration.KeysConfig;
import org.opensearch.dataprepper.plugins.processor.configuration.ServiceTypeOptions;
import org.opensearch.dataprepper.test.helper.ReflectivelySetField;
Expand All @@ -30,14 +29,6 @@ void setUp() {
geoIPProcessorConfig = new GeoIPProcessorConfig();
}

@Test
void getAwsAuthenticationOptionsTestPositive() throws NoSuchFieldException, IllegalAccessException {
AwsAuthenticationOptions awsAuthenticationOptions = new AwsAuthenticationOptions();
ReflectivelySetField.setField(GeoIPProcessorConfig.class,
geoIPProcessorConfig, "awsAuthenticationOptions", awsAuthenticationOptions);
assertThat(geoIPProcessorConfig.getAwsAuthenticationOptions(), equalTo(awsAuthenticationOptions));
}

@Test
void getAwsAuthenticationOptionsTestNegative() {
assertThat(new GeoIPProcessorConfig().getAwsAuthenticationOptions(), equalTo(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.plugins.processor.GeoIPProcessorConfig;
import org.opensearch.dataprepper.plugins.processor.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.processor.extension.AwsAuthenticationOptionsConfig;
import org.opensearch.dataprepper.plugins.processor.configuration.DatabasePathURLConfig;
import org.opensearch.dataprepper.plugins.processor.databaseenrich.DownloadFailedException;
import org.opensearch.dataprepper.test.helper.ReflectivelySetField;
Expand All @@ -36,8 +36,8 @@ class S3DBServiceTest {

@BeforeEach
void setUp() {
AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class);
when(geoIPProcessorConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);
AwsAuthenticationOptionsConfig awsAuthenticationOptionsConfig = mock(AwsAuthenticationOptionsConfig.class);
when(geoIPProcessorConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptionsConfig);
when(geoIPProcessorConfig.getAwsAuthenticationOptions().getAwsRegion()).thenReturn(Region.of(S3_REGION));
AwsCredentialsProvider awsCredentialsProvider = mock(AwsCredentialsProvider.class);
when(geoIPProcessorConfig.getAwsAuthenticationOptions().authenticateAwsConfiguration()).thenReturn(awsCredentialsProvider);
Expand Down
Loading

0 comments on commit e6f7964

Please sign in to comment.