From faa5c179183c44824c2b7cdc37d38eb77182f146 Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Fri, 24 Jan 2025 15:00:20 -0800 Subject: [PATCH] we want to support authentication: username: password: config Signed-off-by: Maxwell Brown --- .../sink/opensearch/OpenSearchSinkIT.java | 3 +- .../opensearch/ConnectionConfiguration.java | 44 +++++++-- .../opensearch/OpenSearchClientRefresher.java | 18 +++- .../opensearch/configuration/AuthConfig.java | 19 ++++ .../configuration/OpenSearchSinkConfig.java | 9 ++ .../ConnectionConfigurationTests.java | 28 ++++++ .../OpenSearchClientRefresherTest.java | 92 ++++++++++++++++++- 7 files changed, 194 insertions(+), 19 deletions(-) create mode 100644 data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/AuthConfig.java diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index b87c5f0c34..9df9095fd4 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -1670,8 +1670,7 @@ private Map initializeConfigurationMetadata(final String indexTy final String user = System.getProperty("tests.opensearch.user"); final String password = System.getProperty("tests.opensearch.password"); if (user != null) { - metadata.put(USERNAME, user); - metadata.put(PASSWORD, password); + metadata.put(AUTHENTICATION, Map.of(USERNAME, user, PASSWORD, password)); } final String distributionVersion = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( OpenSearchIntegrationHelper.getVersion()) >= 0 ? diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java index 4e3aa49c5a..d16499c45c 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java @@ -29,6 +29,7 @@ import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.aws.api.AwsRequestSigningApache4Interceptor; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.PreSerializedJsonpMapper; +import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AwsAuthenticationConfiguration; import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig; import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ServerlessOptions; @@ -98,6 +99,7 @@ public class ConnectionConfiguration { private final String serverlessCollectionName; private final String serverlessVpceId; private final boolean requestCompressionEnabled; + private final AuthConfig authConfig; List getHosts() { return hosts; @@ -159,6 +161,10 @@ boolean isRequestCompressionEnabled() { return requestCompressionEnabled; } + public AuthConfig getAuthConfig() { + return authConfig; + } + private ConnectionConfiguration(final Builder builder) { this.hosts = builder.hosts; this.username = builder.username; @@ -178,6 +184,7 @@ private ConnectionConfiguration(final Builder builder) { this.serverlessCollectionName = builder.serverlessCollectionName; this.serverlessVpceId = builder.serverlessVpceId; this.requestCompressionEnabled = builder.requestCompressionEnabled; + this.authConfig = builder.authConfig; } public static ConnectionConfiguration readConnectionConfiguration(final OpenSearchSinkConfig openSearchSinkConfig){ @@ -185,11 +192,16 @@ public static ConnectionConfiguration readConnectionConfiguration(final OpenSear ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder(hosts); final String username = openSearchSinkConfig.getUsername(); final String password = openSearchSinkConfig.getPassword(); - if (username != null) { - builder = builder.withUsername(username); - } - if (password != null) { - builder = builder.withPassword(password); + final AuthConfig authConfig = openSearchSinkConfig.getAuthConfig(); + if (authConfig != null) { + builder = builder.withAuthConfig(authConfig); + } else { + if (username != null) { + builder = builder.withUsername(username); + } + if (password != null) { + builder = builder.withPassword(password); + } } final Integer socketTimeout = openSearchSinkConfig.getSocketTimeout(); if (socketTimeout != null) { @@ -314,10 +326,18 @@ public AwsCredentialsOptions createAwsCredentialsOptions() { private void attachUserCredentials(final RestClientBuilder restClientBuilder) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - if (username != null) { - LOG.info("Using the username provided in the config."); - credentialsProvider.setCredentials( - AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + if (authConfig != null) { + if (authConfig.getUsername() != null) { + LOG.info("Using the authentication provided in the config."); + credentialsProvider.setCredentials( + AuthScope.ANY, new UsernamePasswordCredentials(authConfig.getUsername(), authConfig.getPassword())); + } + } else { + if (username != null) { + LOG.info("Using the username provided in the config."); + credentialsProvider.setCredentials( + AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + } } restClientBuilder.setHttpClientConfigCallback( httpClientBuilder -> { @@ -481,6 +501,7 @@ public static class Builder { private String serverlessCollectionName; private String serverlessVpceId; private boolean requestCompressionEnabled; + private AuthConfig authConfig; private void validateStsRoleArn(final String awsStsRoleArn) { final Arn arn = getArn(awsStsRoleArn); @@ -605,6 +626,11 @@ public Builder withRequestCompressionEnabled(final boolean requestCompressionEna return this; } + public Builder withAuthConfig(final AuthConfig authConfig) { + this.authConfig = authConfig; + return this; + } + public ConnectionConfiguration build() { return new ConnectionConfiguration(this); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java index 2dd987b8fd..bd3b268e2d 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java @@ -74,13 +74,23 @@ public void update(OpenSearchSinkConfig openSearchSinkConfig) { private boolean basicAuthChanged(final ConnectionConfiguration newConfig) { final String existingUsername; final String existingPassword; - existingUsername = currentConfig.getUsername(); - existingPassword = currentConfig.getPassword(); + if (currentConfig.getAuthConfig() != null) { + existingUsername = currentConfig.getAuthConfig().getUsername(); + existingPassword = currentConfig.getAuthConfig().getPassword(); + } else { + existingUsername = currentConfig.getUsername(); + existingPassword = currentConfig.getPassword(); + } final String newUsername; final String newPassword; - newUsername = newConfig.getUsername(); - newPassword = newConfig.getPassword(); + if (newConfig.getAuthConfig() != null) { + newUsername = newConfig.getAuthConfig().getUsername(); + newPassword = newConfig.getAuthConfig().getPassword(); + } else { + newUsername = newConfig.getUsername(); + newPassword = newConfig.getPassword(); + } return !Objects.equals(existingUsername, newUsername) || !Objects.equals(existingPassword, newPassword); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/AuthConfig.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/AuthConfig.java new file mode 100644 index 0000000000..e41f99043c --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/AuthConfig.java @@ -0,0 +1,19 @@ +package org.opensearch.dataprepper.plugins.sink.opensearch.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class AuthConfig { + @JsonProperty("username") + private String username; + + @JsonProperty("password") + private String password; + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java index d30c448db9..2bde73c184 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java @@ -32,6 +32,15 @@ public class OpenSearchSinkConfig { @JsonProperty("username") private String username = null; + @Getter + @JsonProperty("authentication") + private AuthConfig authConfig; + + @AssertTrue(message = "username and password should not be set when authentication is configured.") + public boolean isAuthConfigValid() { + return authConfig == null || (username == null && password == null); + } + @Getter @JsonProperty("password") private String password = null; diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java index a2356ccb94..9a4cf44b3e 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java @@ -180,6 +180,34 @@ void testReadConnectionConfigurationWithDeprecatedBasicCredentialsAndNoCert() th assertFalse(connectionConfiguration.isAwsSigv4()); } + @Test + void testReadConnectionConfigurationWithBasicCredentialsAndNoCert() throws JsonProcessingException { + final Map configurationMetadata = generateConfigurationMetadata( + TEST_HOSTS, null, null, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false); + configurationMetadata.put("authentication", Map.of("username", TEST_USERNAME, "password", TEST_PASSWORD)); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configurationMetadata); + final ConnectionConfiguration connectionConfiguration = + ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig); + assertEquals(TEST_HOSTS, connectionConfiguration.getHosts()); + assertNull(connectionConfiguration.getUsername()); + assertNull(connectionConfiguration.getPassword()); + assertNotNull(connectionConfiguration.getAuthConfig()); + assertEquals(TEST_USERNAME, connectionConfiguration.getAuthConfig().getUsername()); + assertEquals(TEST_PASSWORD, connectionConfiguration.getAuthConfig().getPassword()); + assertEquals(TEST_CONNECT_TIMEOUT, connectionConfiguration.getConnectTimeout()); + assertEquals(TEST_SOCKET_TIMEOUT, connectionConfiguration.getSocketTimeout()); + assertFalse(connectionConfiguration.isAwsSigv4()); + } + + @Test + void testReadConnectionConfigurationWithBothDeprecatedBasicCredentialsAndAuthConfigShouldThrow() throws JsonProcessingException { + final Map configurationMetadata = generateConfigurationMetadata( + TEST_HOSTS, TEST_USERNAME, null, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false); + configurationMetadata.put("authentication", Map.of("username", TEST_USERNAME, "password", TEST_PASSWORD)); + final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configurationMetadata); + assertFalse(openSearchSinkConfig.isAuthConfigValid()); + } + @Test void testCreateClientWithDeprecatedBasicCredentialsAndNoCert() throws IOException { final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig( diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java index e31c680e8d..0ceb793785 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java @@ -9,6 +9,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig; import java.util.function.Function; @@ -40,6 +41,9 @@ class OpenSearchClientRefresherTest { @Mock private OpenSearchClient openSearchClient; + @Mock + private AuthConfig authConfig; + @Mock private PluginMetrics pluginMetrics; @@ -65,7 +69,7 @@ void testGet() { } @Test - void testGetAfterUpdateWithBasicAuthUnchanged() { + void testGetAfterUpdateWithDeprecatedBasicAuthUnchanged() { final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); assertThat(objectUnderTest.get(), equalTo(openSearchClient)); verify(clientFunction, times(1)).apply(any()); @@ -86,7 +90,31 @@ void testGetAfterUpdateWithBasicAuthUnchanged() { } @Test - void testGetAfterUpdateWithUsernameChanged() { + void testGetAfterUpdateWithBasicAuthUnchanged() { + final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); + verify(clientFunction, times(1)).apply(any()); + when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig); + when(authConfig.getUsername()).thenReturn(TEST_USERNAME); + when(authConfig.getPassword()).thenReturn(TEST_PASSWORD); + final OpenSearchSinkConfig newConfig = mock(OpenSearchSinkConfig.class); + final ConnectionConfiguration newConnectionConfiguration = mock(ConnectionConfiguration.class); + final AuthConfig newAuthConfig = mock(AuthConfig.class); + when(newConnectionConfiguration.getAuthConfig()).thenReturn(newAuthConfig); + when(newAuthConfig.getUsername()).thenReturn(TEST_USERNAME); + when(newAuthConfig.getPassword()).thenReturn(TEST_PASSWORD); + try (MockedStatic configurationMockedStatic = mockStatic( + ConnectionConfiguration.class)) { + configurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(eq(newConfig))) + .thenReturn(newConnectionConfiguration); + objectUnderTest.update(newConfig); + } + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); + verifyNoMoreInteractions(clientFunction); + } + + @Test + void testGetAfterUpdateWithDeprecatedUsernameChanged() { when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); assertThat(objectUnderTest.get(), equalTo(openSearchClient)); @@ -109,9 +137,36 @@ void testGetAfterUpdateWithUsernameChanged() { verify(clientFunction, times(2)).apply(any()); } + @Test + void testGetAfterUpdateWithUsernameChanged() { + when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); + final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); + verify(clientFunction, times(1)).apply(any()); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); + when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig); + when(authConfig.getUsername()).thenReturn(TEST_USERNAME); + when(authConfig.getPassword()).thenReturn(TEST_PASSWORD); + final OpenSearchSinkConfig newConfig = mock(OpenSearchSinkConfig.class); + final ConnectionConfiguration newConnectionConfiguration = mock(ConnectionConfiguration.class); + final AuthConfig newAuthConfig = mock(AuthConfig.class); + when(newConnectionConfiguration.getAuthConfig()).thenReturn(newAuthConfig); + when(newAuthConfig.getUsername()).thenReturn(TEST_USERNAME + "_changed"); + final OpenSearchClient newClient = mock(OpenSearchClient.class); + when(clientFunction.apply(eq(newConnectionConfiguration))).thenReturn(newClient); + try (MockedStatic configurationMockedStatic = mockStatic( + ConnectionConfiguration.class)) { + configurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(eq(newConfig))) + .thenReturn(newConnectionConfiguration); + objectUnderTest.update(newConfig); + } + assertThat(objectUnderTest.get(), equalTo(newClient)); + verify(credentialsChangeCounter).increment(); + verify(clientFunction, times(2)).apply(any()); + } @Test - void testGetAfterUpdateWithPasswordChanged() { + void testGetAfterUpdateWithDeprecatedPasswordChanged() { when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); assertThat(objectUnderTest.get(), equalTo(openSearchClient)); @@ -136,6 +191,35 @@ void testGetAfterUpdateWithPasswordChanged() { verify(clientFunction, times(2)).apply(any()); } + @Test + void testGetAfterUpdateWithPasswordChanged() { + when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); + final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); + verify(clientFunction, times(1)).apply(any()); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); + when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig); + when(authConfig.getUsername()).thenReturn(TEST_USERNAME); + when(authConfig.getPassword()).thenReturn(TEST_PASSWORD); + final OpenSearchSinkConfig newConfig = mock(OpenSearchSinkConfig.class); + final ConnectionConfiguration newConnectionConfiguration = mock(ConnectionConfiguration.class); + final AuthConfig newAuthConfig = mock(AuthConfig.class); + when(newConnectionConfiguration.getAuthConfig()).thenReturn(newAuthConfig); + when(newAuthConfig.getUsername()).thenReturn(TEST_USERNAME); + when(newAuthConfig.getPassword()).thenReturn(TEST_PASSWORD + "_changed"); + final OpenSearchClient newClient = mock(OpenSearchClient.class); + when(clientFunction.apply(eq(newConnectionConfiguration))).thenReturn(newClient); + try (MockedStatic configurationMockedStatic = mockStatic( + ConnectionConfiguration.class)) { + configurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(eq(newConfig))) + .thenReturn(newConnectionConfiguration); + objectUnderTest.update(newConfig); + } + assertThat(objectUnderTest.get(), equalTo(newClient)); + verify(credentialsChangeCounter).increment(); + verify(clientFunction, times(2)).apply(any()); + } + @Test void testGetAfterUpdateClientFailure() { when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); @@ -163,4 +247,4 @@ void testGetAfterUpdateClientFailure() { verify(clientRefreshErrorsCounter).increment(); verify(clientFunction, times(2)).apply(any()); } -} +} \ No newline at end of file