Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into jira-ack-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
san81 authored Jan 21, 2025
2 parents bebc885 + 9c61e03 commit b4aa574
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# This should match the owning team set up in https://github.com/orgs/opensearch-project/teams
* @sb2k16 @chenqi0805 @engechas @san81 @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
* @sb2k16 @chenqi0805 @engechas @san81 @srikanthjg @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
27 changes: 14 additions & 13 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ This document contains a list of maintainers in this repo. See [opensearch-proje

## Current Maintainers

| Maintainer | GitHub ID | Affiliation |
| -------------------- | --------------------------------------------------------- | ----------- |
| Souvik Bose | [sb2k16](https://github.com/sb2k16) | Amazon |
| Qi Chen | [chenqi0805](https://github.com/chenqi0805) | Amazon |
| Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon |
| Santhosh Gandhe | [san81](https://github.com/san81) | Amazon |
| Taylor Gray | [graytaylor0](https://github.com/graytaylor0) | Amazon |
| Dinu John | [dinujoh](https://github.com/dinujoh) | Amazon |
| Krishna Kondaka | [kkondaka](https://github.com/kkondaka) | Amazon |
| Asif Sohail Mohammed | [asifsmohammed](https://github.com/asifsmohammed) | Amazon |
| Karsten Schnitter | [KarstenSchnitter](https://github.com/KarstenSchnitter) | SAP |
| David Venable | [dlvenable](https://github.com/dlvenable) | Amazon |
| Hai Yan | [oeyh](https://github.com/oeyh) | Amazon |
| Maintainer | GitHub ID | Affiliation |
| ---------------------- | --------------------------------------------------------- | ----------- |
| Souvik Bose | [sb2k16](https://github.com/sb2k16) | Amazon |
| Qi Chen | [chenqi0805](https://github.com/chenqi0805) | Amazon |
| Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon |
| Santhosh Gandhe | [san81](https://github.com/san81) | Amazon |
| Srikanth Govindarajan | [srikanthjg](https://github.com/srikanthjg) | Amazon |
| Taylor Gray | [graytaylor0](https://github.com/graytaylor0) | Amazon |
| Dinu John | [dinujoh](https://github.com/dinujoh) | Amazon |
| Krishna Kondaka | [kkondaka](https://github.com/kkondaka) | Amazon |
| Asif Sohail Mohammed | [asifsmohammed](https://github.com/asifsmohammed) | Amazon |
| Karsten Schnitter | [KarstenSchnitter](https://github.com/KarstenSchnitter) | SAP |
| David Venable | [dlvenable](https://github.com/dlvenable) | Amazon |
| Hai Yan | [oeyh](https://github.com/oeyh) | Amazon |


## Emeritus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public interface PluginConfigVariable {
*/
void setValue(Object updatedValue);

/**
* Refresh the secret value on demand
*/
void refresh();

/**
* Returns if the variable is updatable.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ public void setValue(Object updatedValue) {
this.secretValue = updatedValue.toString();
}

@Override
public void refresh() {
}

@Override
public boolean isUpdatable() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public void setValue(Object newValue) {
this.secretValue = newValue;
}

@Override
public void refresh() {
secretsSupplier.refresh(secretId);
}


@Override
public boolean isUpdatable() {
return isUpdatable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -81,4 +83,11 @@ void testSetValueSuccess(final String input) {
assertThat(objectUnderTest.getValue(), equalTo(input));
}


@Test
void testRefreshSecretsWithKey() {
objectUnderTest.refresh();
verify(secretsSupplier, times(1)).refresh(secretId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException;
import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

import java.time.Duration;
Expand Down Expand Up @@ -185,9 +186,10 @@ public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
.tableName(tableName)
.namespace(kclMetricsNamespaceName);

RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
ConsumerStrategy consumerStrategy = kinesisSourceConfig.getConsumerStrategy();
if (consumerStrategy == ConsumerStrategy.POLLING) {
configsBuilder.retrievalConfig().retrievalSpecificConfig(
retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(
new PollingConfig(kinesisClient)
.maxRecords(kinesisSourceConfig.getPollingConfig().getMaxPollingRecords())
.idleTimeBetweenReadsInMillis(
Expand All @@ -203,7 +205,7 @@ public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
retrievalConfig
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -276,6 +277,30 @@ void testCreateSchedulerWithPollingStrategy() {
verify(workerIdentifierGenerator, times(1)).generate();
}

@Test
void testCreateSchedulerWithPollingStrategyAndPollingConfig() {
when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.POLLING);
when(kinesisSourceConfig.getPollingConfig()).thenReturn(kinesisStreamPollingConfig);
KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory,
pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator);
Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer);

assertEquals(kinesisService.getApplicationName(), pipelineName);
assertNotNull(schedulerObjectUnderTest);
assertNotNull(schedulerObjectUnderTest.checkpointConfig());
assertNotNull(schedulerObjectUnderTest.leaseManagementConfig());
assertSame(schedulerObjectUnderTest.leaseManagementConfig().initialPositionInStream().getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
assertNotNull(schedulerObjectUnderTest.lifecycleConfig());
assertNotNull(schedulerObjectUnderTest.metricsConfig());
assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED);
assertNotNull(schedulerObjectUnderTest.processorConfig());
assertNotNull(schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig());
assertEquals(((PollingConfig)schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig()).maxRecords(), kinesisStreamPollingConfig.getMaxPollingRecords());
assertEquals(((PollingConfig)schedulerObjectUnderTest.retrievalConfig().retrievalSpecificConfig()).idleTimeBetweenReadsInMillis(), kinesisStreamPollingConfig.getIdleTimeBetweenReads().toMillis());
assertNotNull(schedulerObjectUnderTest.retrievalConfig());
verify(workerIdentifierGenerator, times(1)).generate();
}

@Test
void testServiceStartNullBufferThrows() {
KinesisService kinesisService = createObjectUnderTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,16 @@ public void renewCredentials() {
} catch (HttpClientErrorException ex) {
this.expireTime = Instant.ofEpochMilli(0);
this.expiresInSeconds = 0;
HttpStatus statusCode = ex.getStatusCode();
log.error("Failed to renew access token. Status code: {}, Error Message: {}",
ex.getRawStatusCode(), ex.getMessage());
throw new RuntimeException("Failed to renew access token" + ex.getMessage(), ex);
statusCode, ex.getMessage());
if (statusCode == HttpStatus.FORBIDDEN || statusCode == HttpStatus.UNAUTHORIZED) {
log.info("Trying to refresh the secrets");
// Try refreshing the secrets and see if that helps
// Refreshing one of the secret refreshes the entire store so we are good to trigger refresh on just one
jiraSourceConfig.getAuthenticationConfig().getOauth2Config().getAccessToken().refresh();
}
throw new RuntimeException("Failed to renew access token message:" + ex.getMessage(), ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.plugin.PluginConfigVariable;
import org.opensearch.dataprepper.plugins.source.jira.JiraSourceConfig;
import org.opensearch.dataprepper.plugins.source.jira.configuration.Oauth2Config;
import org.opensearch.dataprepper.plugins.source.jira.exception.UnAuthorizedException;
import org.opensearch.dataprepper.test.helper.ReflectivelySetField;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
Expand Down Expand Up @@ -52,6 +55,9 @@ public class JiraOauthConfigTest {

JiraSourceConfig jiraSourceConfig;

@Mock
PluginConfigVariable accessTokenVariable;

@BeforeEach
void setUp() {
jiraSourceConfig = createJiraConfigurationFromYaml("oauth2-auth-jira-pipeline.yaml");
Expand Down Expand Up @@ -89,12 +95,31 @@ void testRenewToken() throws InterruptedException {
}

@Test
void testFailedToRenewAccessToken() {
void testFailedToRenewAccessToken() throws NoSuchFieldException, IllegalAccessException {
JiraOauthConfig jiraOauthConfig = new JiraOauthConfig(jiraSourceConfig);
Oauth2Config oauth2Config = jiraSourceConfig.getAuthenticationConfig().getOauth2Config();
ReflectivelySetField.setField(Oauth2Config.class, oauth2Config, "accessToken", accessTokenVariable);
when(restTemplateMock.postForEntity(any(String.class), any(HttpEntity.class), any(Class.class)))
.thenThrow(HttpClientErrorException.class);
jiraOauthConfig.restTemplate = restTemplateMock;
assertThrows(RuntimeException.class, jiraOauthConfig::renewCredentials);
verify(oauth2Config.getAccessToken(), times(0))
.refresh();
}

@Test
void testFailedToRenewAccessToken_with_unauthorized_and_trigger_secrets_refresh()
throws NoSuchFieldException, IllegalAccessException {
JiraOauthConfig jiraOauthConfig = new JiraOauthConfig(jiraSourceConfig);
Oauth2Config oauth2Config = jiraSourceConfig.getAuthenticationConfig().getOauth2Config();
ReflectivelySetField.setField(Oauth2Config.class, oauth2Config, "accessToken", accessTokenVariable);
HttpClientErrorException unAuthorizedException = new HttpClientErrorException(HttpStatus.UNAUTHORIZED);
when(restTemplateMock.postForEntity(any(String.class), any(HttpEntity.class), any(Class.class)))
.thenThrow(unAuthorizedException);
jiraOauthConfig.restTemplate = restTemplateMock;
assertThrows(RuntimeException.class, jiraOauthConfig::renewCredentials);
verify(oauth2Config.getAccessToken(), times(1))
.refresh();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public void setValue(Object someValue) {
this.defaultValue = someValue;
}

@Override
public void refresh() {
}

@Override
public boolean isUpdatable() {
return true;
Expand Down

0 comments on commit b4aa574

Please sign in to comment.