From fe8a94be7955817e234329482f502dee9bcda9ec Mon Sep 17 00:00:00 2001 From: David Venable Date: Tue, 21 Jan 2025 12:10:30 -0800 Subject: [PATCH 1/3] Welcoming Srikanth Govindarajan (srikanthjg) to the Data Prepper maintainers. (#5337) Signed-off-by: David Venable --- .github/CODEOWNERS | 2 +- MAINTAINERS.md | 27 ++++++++++++++------------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index e1e9d8f1eb..8c2effe15f 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,2 +1,2 @@ # This should match the owning team set up in https://github.com/orgs/opensearch-project/teams -* @sb2k16 @chenqi0805 @engechas @san81 @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh \ No newline at end of file +* @sb2k16 @chenqi0805 @engechas @san81 @srikanthjg @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh \ No newline at end of file diff --git a/MAINTAINERS.md b/MAINTAINERS.md index 51a68597ba..85b7e816cf 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -4,19 +4,20 @@ This document contains a list of maintainers in this repo. See [opensearch-proje ## Current Maintainers -| Maintainer | GitHub ID | Affiliation | -| -------------------- | --------------------------------------------------------- | ----------- | -| Souvik Bose | [sb2k16](https://github.com/sb2k16) | Amazon | -| Qi Chen | [chenqi0805](https://github.com/chenqi0805) | Amazon | -| Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon | -| Santhosh Gandhe | [san81](https://github.com/san81) | Amazon | -| Taylor Gray | [graytaylor0](https://github.com/graytaylor0) | Amazon | -| Dinu John | [dinujoh](https://github.com/dinujoh) | Amazon | -| Krishna Kondaka | [kkondaka](https://github.com/kkondaka) | Amazon | -| Asif Sohail Mohammed | [asifsmohammed](https://github.com/asifsmohammed) | Amazon | -| Karsten Schnitter | [KarstenSchnitter](https://github.com/KarstenSchnitter) | SAP | -| David Venable | [dlvenable](https://github.com/dlvenable) | Amazon | -| Hai Yan | [oeyh](https://github.com/oeyh) | Amazon | +| Maintainer | GitHub ID | Affiliation | +| ---------------------- | --------------------------------------------------------- | ----------- | +| Souvik Bose | [sb2k16](https://github.com/sb2k16) | Amazon | +| Qi Chen | [chenqi0805](https://github.com/chenqi0805) | Amazon | +| Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon | +| Santhosh Gandhe | [san81](https://github.com/san81) | Amazon | +| Srikanth Govindarajan | [srikanthjg](https://github.com/srikanthjg) | Amazon | +| Taylor Gray | [graytaylor0](https://github.com/graytaylor0) | Amazon | +| Dinu John | [dinujoh](https://github.com/dinujoh) | Amazon | +| Krishna Kondaka | [kkondaka](https://github.com/kkondaka) | Amazon | +| Asif Sohail Mohammed | [asifsmohammed](https://github.com/asifsmohammed) | Amazon | +| Karsten Schnitter | [KarstenSchnitter](https://github.com/KarstenSchnitter) | SAP | +| David Venable | [dlvenable](https://github.com/dlvenable) | Amazon | +| Hai Yan | [oeyh](https://github.com/oeyh) | Amazon | ## Emeritus From 01c9b4a8f13709553721bd4e4aa8265d86d45e42 Mon Sep 17 00:00:00 2001 From: Santhosh Gandhe <1909520+san81@users.noreply.github.com> Date: Tue, 21 Jan 2025 12:48:50 -0800 Subject: [PATCH 2/3] Refreshing AWS Secret values on demand (#5347) * refreshing the values on demand Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> * refreshing only for the forbidden or unauthorized error casea and additional test cases to cover those scenarios Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> --------- Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> --- .../model/plugin/PluginConfigVariable.java | 5 ++++ .../plugin/VariableExpanderTest.java | 4 +++ .../plugins/aws/AwsPluginConfigVariable.java | 6 +++++ .../aws/AwsPluginConfigVariableTest.java | 9 +++++++ .../jira/rest/auth/JiraOauthConfig.java | 11 ++++++-- .../jira/rest/auth/JiraOauthConfigTest.java | 27 ++++++++++++++++++- .../utils/MockPluginConfigVariableImpl.java | 4 +++ 7 files changed, 63 insertions(+), 3 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigVariable.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigVariable.java index 32c5bde678..ad53bfa2fd 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigVariable.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigVariable.java @@ -31,6 +31,11 @@ public interface PluginConfigVariable { */ void setValue(Object updatedValue); + /** + * Refresh the secret value on demand + */ + void refresh(); + /** * Returns if the variable is updatable. * diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/VariableExpanderTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/VariableExpanderTest.java index c8d2559302..ed1f1b72e0 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/VariableExpanderTest.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/VariableExpanderTest.java @@ -157,6 +157,10 @@ public void setValue(Object updatedValue) { this.secretValue = updatedValue.toString(); } + @Override + public void refresh() { + } + @Override public boolean isUpdatable() { return true; diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsPluginConfigVariable.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsPluginConfigVariable.java index d53528443e..4f5c494ebf 100644 --- a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsPluginConfigVariable.java +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsPluginConfigVariable.java @@ -47,6 +47,12 @@ public void setValue(Object newValue) { this.secretValue = newValue; } + @Override + public void refresh() { + secretsSupplier.refresh(secretId); + } + + @Override public boolean isUpdatable() { return isUpdatable; diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsPluginConfigVariableTest.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsPluginConfigVariableTest.java index faad9f72d7..0bd6fbbe28 100644 --- a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsPluginConfigVariableTest.java +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsPluginConfigVariableTest.java @@ -25,6 +25,8 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -81,4 +83,11 @@ void testSetValueSuccess(final String input) { assertThat(objectUnderTest.getValue(), equalTo(input)); } + + @Test + void testRefreshSecretsWithKey() { + objectUnderTest.refresh(); + verify(secretsSupplier, times(1)).refresh(secretId); + } + } \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfig.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfig.java index aaa9850a8c..9228fa82df 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfig.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfig.java @@ -142,9 +142,16 @@ public void renewCredentials() { } catch (HttpClientErrorException ex) { this.expireTime = Instant.ofEpochMilli(0); this.expiresInSeconds = 0; + HttpStatus statusCode = ex.getStatusCode(); log.error("Failed to renew access token. Status code: {}, Error Message: {}", - ex.getRawStatusCode(), ex.getMessage()); - throw new RuntimeException("Failed to renew access token" + ex.getMessage(), ex); + statusCode, ex.getMessage()); + if (statusCode == HttpStatus.FORBIDDEN || statusCode == HttpStatus.UNAUTHORIZED) { + log.info("Trying to refresh the secrets"); + // Try refreshing the secrets and see if that helps + // Refreshing one of the secret refreshes the entire store so we are good to trigger refresh on just one + jiraSourceConfig.getAuthenticationConfig().getOauth2Config().getAccessToken().refresh(); + } + throw new RuntimeException("Failed to renew access token message:" + ex.getMessage(), ex); } } } diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfigTest.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfigTest.java index bafe995801..746dd41dff 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfigTest.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/rest/auth/JiraOauthConfigTest.java @@ -15,8 +15,11 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.plugin.PluginConfigVariable; import org.opensearch.dataprepper.plugins.source.jira.JiraSourceConfig; +import org.opensearch.dataprepper.plugins.source.jira.configuration.Oauth2Config; import org.opensearch.dataprepper.plugins.source.jira.exception.UnAuthorizedException; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; import org.springframework.http.HttpEntity; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; @@ -52,6 +55,9 @@ public class JiraOauthConfigTest { JiraSourceConfig jiraSourceConfig; + @Mock + PluginConfigVariable accessTokenVariable; + @BeforeEach void setUp() { jiraSourceConfig = createJiraConfigurationFromYaml("oauth2-auth-jira-pipeline.yaml"); @@ -89,12 +95,31 @@ void testRenewToken() throws InterruptedException { } @Test - void testFailedToRenewAccessToken() { + void testFailedToRenewAccessToken() throws NoSuchFieldException, IllegalAccessException { JiraOauthConfig jiraOauthConfig = new JiraOauthConfig(jiraSourceConfig); + Oauth2Config oauth2Config = jiraSourceConfig.getAuthenticationConfig().getOauth2Config(); + ReflectivelySetField.setField(Oauth2Config.class, oauth2Config, "accessToken", accessTokenVariable); when(restTemplateMock.postForEntity(any(String.class), any(HttpEntity.class), any(Class.class))) .thenThrow(HttpClientErrorException.class); jiraOauthConfig.restTemplate = restTemplateMock; assertThrows(RuntimeException.class, jiraOauthConfig::renewCredentials); + verify(oauth2Config.getAccessToken(), times(0)) + .refresh(); + } + + @Test + void testFailedToRenewAccessToken_with_unauthorized_and_trigger_secrets_refresh() + throws NoSuchFieldException, IllegalAccessException { + JiraOauthConfig jiraOauthConfig = new JiraOauthConfig(jiraSourceConfig); + Oauth2Config oauth2Config = jiraSourceConfig.getAuthenticationConfig().getOauth2Config(); + ReflectivelySetField.setField(Oauth2Config.class, oauth2Config, "accessToken", accessTokenVariable); + HttpClientErrorException unAuthorizedException = new HttpClientErrorException(HttpStatus.UNAUTHORIZED); + when(restTemplateMock.postForEntity(any(String.class), any(HttpEntity.class), any(Class.class))) + .thenThrow(unAuthorizedException); + jiraOauthConfig.restTemplate = restTemplateMock; + assertThrows(RuntimeException.class, jiraOauthConfig::renewCredentials); + verify(oauth2Config.getAccessToken(), times(1)) + .refresh(); } diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/utils/MockPluginConfigVariableImpl.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/utils/MockPluginConfigVariableImpl.java index aa8cfd87ab..dd7145d4ed 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/utils/MockPluginConfigVariableImpl.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/utils/MockPluginConfigVariableImpl.java @@ -33,6 +33,10 @@ public void setValue(Object someValue) { this.defaultValue = someValue; } + @Override + public void refresh() { + } + @Override public boolean isUpdatable() { return true; From 9c61e0367d9f4e5d9033f5d3d78082078d4ff990 Mon Sep 17 00:00:00 2001 From: Rashmi Date: Wed, 22 Jan 2025 02:30:58 +0530 Subject: [PATCH 3/3] Fixed kinesis retrieval config argument passing to KCL scheduler (#5272) Signed-off-by: RashmiRam --- .../kinesis/source/KinesisService.java | 6 +++-- .../kinesis/source/KinesisServiceTest.java | 25 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java index 86cb7836e2..b0cafff724 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java @@ -41,6 +41,7 @@ import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException; import software.amazon.kinesis.exceptions.ThrottlingException; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; +import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.polling.PollingConfig; import java.time.Duration; @@ -185,9 +186,10 @@ public Scheduler createScheduler(final Buffer> buffer) { .tableName(tableName) .namespace(kclMetricsNamespaceName); + RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); ConsumerStrategy consumerStrategy = kinesisSourceConfig.getConsumerStrategy(); if (consumerStrategy == ConsumerStrategy.POLLING) { - configsBuilder.retrievalConfig().retrievalSpecificConfig( + retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig( new PollingConfig(kinesisClient) .maxRecords(kinesisSourceConfig.getPollingConfig().getMaxPollingRecords()) .idleTimeBetweenReadsInMillis( @@ -203,7 +205,7 @@ public Scheduler createScheduler(final Buffer> buffer) { configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), - configsBuilder.retrievalConfig() + retrievalConfig ); } } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java index 95d01594ea..896faa4155 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java @@ -42,6 +42,7 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.retrieval.polling.PollingConfig; import java.time.Duration; import java.time.Instant; @@ -276,6 +277,30 @@ void testCreateSchedulerWithPollingStrategy() { verify(workerIdentifierGenerator, times(1)).generate(); } + @Test + void testCreateSchedulerWithPollingStrategyAndPollingConfig() { + when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.POLLING); + when(kinesisSourceConfig.getPollingConfig()).thenReturn(kinesisStreamPollingConfig); + KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); + Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); + + assertEquals(kinesisService.getApplicationName(), pipelineName); + assertNotNull(schedulerObjectUnderTest); + assertNotNull(schedulerObjectUnderTest.checkpointConfig()); + assertNotNull(schedulerObjectUnderTest.leaseManagementConfig()); + assertSame(schedulerObjectUnderTest.leaseManagementConfig().initialPositionInStream().getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON); + assertNotNull(schedulerObjectUnderTest.lifecycleConfig()); + assertNotNull(schedulerObjectUnderTest.metricsConfig()); + assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED); + assertNotNull(schedulerObjectUnderTest.processorConfig()); + assertNotNull(schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig()); + assertEquals(((PollingConfig)schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig()).maxRecords(), kinesisStreamPollingConfig.getMaxPollingRecords()); + assertEquals(((PollingConfig)schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig()).idleTimeBetweenReadsInMillis(), kinesisStreamPollingConfig.getIdleTimeBetweenReads().toMillis()); + assertNotNull(schedulerObjectUnderTest.retrievalConfig()); + verify(workerIdentifierGenerator, times(1)).generate(); + } + @Test void testServiceStartNullBufferThrows() { KinesisService kinesisService = createObjectUnderTest();