Skip to content

Commit

Permalink
we want to support authentication: username: password: config
Browse files Browse the repository at this point in the history
Signed-off-by: Maxwell Brown <[email protected]>
  • Loading branch information
Galactus22625 committed Jan 24, 2025
1 parent 122aaba commit faa5c17
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1670,8 +1670,7 @@ private Map<String, Object> initializeConfigurationMetadata(final String indexTy
final String user = System.getProperty("tests.opensearch.user");
final String password = System.getProperty("tests.opensearch.password");
if (user != null) {
metadata.put(USERNAME, user);
metadata.put(PASSWORD, password);
metadata.put(AUTHENTICATION, Map.of(USERNAME, user, PASSWORD, password));
}
final String distributionVersion = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo(
OpenSearchIntegrationHelper.getVersion()) >= 0 ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.aws.api.AwsRequestSigningApache4Interceptor;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.PreSerializedJsonpMapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AuthConfig;
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AwsAuthenticationConfiguration;
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig;
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ServerlessOptions;
Expand Down Expand Up @@ -98,6 +99,7 @@ public class ConnectionConfiguration {
private final String serverlessCollectionName;
private final String serverlessVpceId;
private final boolean requestCompressionEnabled;
private final AuthConfig authConfig;

List<String> getHosts() {
return hosts;
Expand Down Expand Up @@ -159,6 +161,10 @@ boolean isRequestCompressionEnabled() {
return requestCompressionEnabled;
}

public AuthConfig getAuthConfig() {
return authConfig;
}

private ConnectionConfiguration(final Builder builder) {
this.hosts = builder.hosts;
this.username = builder.username;
Expand All @@ -178,18 +184,24 @@ private ConnectionConfiguration(final Builder builder) {
this.serverlessCollectionName = builder.serverlessCollectionName;
this.serverlessVpceId = builder.serverlessVpceId;
this.requestCompressionEnabled = builder.requestCompressionEnabled;
this.authConfig = builder.authConfig;
}

public static ConnectionConfiguration readConnectionConfiguration(final OpenSearchSinkConfig openSearchSinkConfig){
final List<String> hosts = openSearchSinkConfig.getHosts();
ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder(hosts);
final String username = openSearchSinkConfig.getUsername();
final String password = openSearchSinkConfig.getPassword();
if (username != null) {
builder = builder.withUsername(username);
}
if (password != null) {
builder = builder.withPassword(password);
final AuthConfig authConfig = openSearchSinkConfig.getAuthConfig();
if (authConfig != null) {
builder = builder.withAuthConfig(authConfig);
} else {
if (username != null) {
builder = builder.withUsername(username);
}
if (password != null) {
builder = builder.withPassword(password);
}
}
final Integer socketTimeout = openSearchSinkConfig.getSocketTimeout();
if (socketTimeout != null) {
Expand Down Expand Up @@ -314,10 +326,18 @@ public AwsCredentialsOptions createAwsCredentialsOptions() {

private void attachUserCredentials(final RestClientBuilder restClientBuilder) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
if (username != null) {
LOG.info("Using the username provided in the config.");
credentialsProvider.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials(username, password));
if (authConfig != null) {
if (authConfig.getUsername() != null) {
LOG.info("Using the authentication provided in the config.");
credentialsProvider.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials(authConfig.getUsername(), authConfig.getPassword()));
}
} else {
if (username != null) {
LOG.info("Using the username provided in the config.");
credentialsProvider.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials(username, password));
}
}
restClientBuilder.setHttpClientConfigCallback(
httpClientBuilder -> {
Expand Down Expand Up @@ -481,6 +501,7 @@ public static class Builder {
private String serverlessCollectionName;
private String serverlessVpceId;
private boolean requestCompressionEnabled;
private AuthConfig authConfig;

private void validateStsRoleArn(final String awsStsRoleArn) {
final Arn arn = getArn(awsStsRoleArn);
Expand Down Expand Up @@ -605,6 +626,11 @@ public Builder withRequestCompressionEnabled(final boolean requestCompressionEna
return this;
}

public Builder withAuthConfig(final AuthConfig authConfig) {
this.authConfig = authConfig;
return this;
}

public ConnectionConfiguration build() {
return new ConnectionConfiguration(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,23 @@ public void update(OpenSearchSinkConfig openSearchSinkConfig) {
private boolean basicAuthChanged(final ConnectionConfiguration newConfig) {
final String existingUsername;
final String existingPassword;
existingUsername = currentConfig.getUsername();
existingPassword = currentConfig.getPassword();
if (currentConfig.getAuthConfig() != null) {
existingUsername = currentConfig.getAuthConfig().getUsername();
existingPassword = currentConfig.getAuthConfig().getPassword();
} else {
existingUsername = currentConfig.getUsername();
existingPassword = currentConfig.getPassword();
}

final String newUsername;
final String newPassword;
newUsername = newConfig.getUsername();
newPassword = newConfig.getPassword();
if (newConfig.getAuthConfig() != null) {
newUsername = newConfig.getAuthConfig().getUsername();
newPassword = newConfig.getAuthConfig().getPassword();
} else {
newUsername = newConfig.getUsername();
newPassword = newConfig.getPassword();
}

return !Objects.equals(existingUsername, newUsername) ||
!Objects.equals(existingPassword, newPassword);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;

public class AuthConfig {
@JsonProperty("username")
private String username;

@JsonProperty("password")
private String password;

public String getUsername() {
return username;
}

public String getPassword() {
return password;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ public class OpenSearchSinkConfig {
@JsonProperty("username")
private String username = null;

@Getter
@JsonProperty("authentication")
private AuthConfig authConfig;

@AssertTrue(message = "username and password should not be set when authentication is configured.")
public boolean isAuthConfigValid() {
return authConfig == null || (username == null && password == null);
}

@Getter
@JsonProperty("password")
private String password = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,34 @@ void testReadConnectionConfigurationWithDeprecatedBasicCredentialsAndNoCert() th
assertFalse(connectionConfiguration.isAwsSigv4());
}

@Test
void testReadConnectionConfigurationWithBasicCredentialsAndNoCert() throws JsonProcessingException {
final Map<String, Object> configurationMetadata = generateConfigurationMetadata(
TEST_HOSTS, null, null, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false);
configurationMetadata.put("authentication", Map.of("username", TEST_USERNAME, "password", TEST_PASSWORD));
final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configurationMetadata);
final ConnectionConfiguration connectionConfiguration =
ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig);
assertEquals(TEST_HOSTS, connectionConfiguration.getHosts());
assertNull(connectionConfiguration.getUsername());
assertNull(connectionConfiguration.getPassword());
assertNotNull(connectionConfiguration.getAuthConfig());
assertEquals(TEST_USERNAME, connectionConfiguration.getAuthConfig().getUsername());
assertEquals(TEST_PASSWORD, connectionConfiguration.getAuthConfig().getPassword());
assertEquals(TEST_CONNECT_TIMEOUT, connectionConfiguration.getConnectTimeout());
assertEquals(TEST_SOCKET_TIMEOUT, connectionConfiguration.getSocketTimeout());
assertFalse(connectionConfiguration.isAwsSigv4());
}

@Test
void testReadConnectionConfigurationWithBothDeprecatedBasicCredentialsAndAuthConfigShouldThrow() throws JsonProcessingException {
final Map<String, Object> configurationMetadata = generateConfigurationMetadata(
TEST_HOSTS, TEST_USERNAME, null, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false);
configurationMetadata.put("authentication", Map.of("username", TEST_USERNAME, "password", TEST_PASSWORD));
final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configurationMetadata);
assertFalse(openSearchSinkConfig.isAuthConfigValid());
}

@Test
void testCreateClientWithDeprecatedBasicCredentialsAndNoCert() throws IOException {
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AuthConfig;
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig;

import java.util.function.Function;
Expand Down Expand Up @@ -40,6 +41,9 @@ class OpenSearchClientRefresherTest {
@Mock
private OpenSearchClient openSearchClient;

@Mock
private AuthConfig authConfig;

@Mock
private PluginMetrics pluginMetrics;

Expand All @@ -65,7 +69,7 @@ void testGet() {
}

@Test
void testGetAfterUpdateWithBasicAuthUnchanged() {
void testGetAfterUpdateWithDeprecatedBasicAuthUnchanged() {
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
Expand All @@ -86,7 +90,31 @@ void testGetAfterUpdateWithBasicAuthUnchanged() {
}

@Test
void testGetAfterUpdateWithUsernameChanged() {
void testGetAfterUpdateWithBasicAuthUnchanged() {
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig);
when(authConfig.getUsername()).thenReturn(TEST_USERNAME);
when(authConfig.getPassword()).thenReturn(TEST_PASSWORD);
final OpenSearchSinkConfig newConfig = mock(OpenSearchSinkConfig.class);
final ConnectionConfiguration newConnectionConfiguration = mock(ConnectionConfiguration.class);
final AuthConfig newAuthConfig = mock(AuthConfig.class);
when(newConnectionConfiguration.getAuthConfig()).thenReturn(newAuthConfig);
when(newAuthConfig.getUsername()).thenReturn(TEST_USERNAME);
when(newAuthConfig.getPassword()).thenReturn(TEST_PASSWORD);
try (MockedStatic<ConnectionConfiguration> configurationMockedStatic = mockStatic(
ConnectionConfiguration.class)) {
configurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(eq(newConfig)))
.thenReturn(newConnectionConfiguration);
objectUnderTest.update(newConfig);
}
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verifyNoMoreInteractions(clientFunction);
}

@Test
void testGetAfterUpdateWithDeprecatedUsernameChanged() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
Expand All @@ -109,9 +137,36 @@ void testGetAfterUpdateWithUsernameChanged() {
verify(clientFunction, times(2)).apply(any());
}

@Test
void testGetAfterUpdateWithUsernameChanged() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig);
when(authConfig.getUsername()).thenReturn(TEST_USERNAME);
when(authConfig.getPassword()).thenReturn(TEST_PASSWORD);
final OpenSearchSinkConfig newConfig = mock(OpenSearchSinkConfig.class);
final ConnectionConfiguration newConnectionConfiguration = mock(ConnectionConfiguration.class);
final AuthConfig newAuthConfig = mock(AuthConfig.class);
when(newConnectionConfiguration.getAuthConfig()).thenReturn(newAuthConfig);
when(newAuthConfig.getUsername()).thenReturn(TEST_USERNAME + "_changed");
final OpenSearchClient newClient = mock(OpenSearchClient.class);
when(clientFunction.apply(eq(newConnectionConfiguration))).thenReturn(newClient);
try (MockedStatic<ConnectionConfiguration> configurationMockedStatic = mockStatic(
ConnectionConfiguration.class)) {
configurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(eq(newConfig)))
.thenReturn(newConnectionConfiguration);
objectUnderTest.update(newConfig);
}
assertThat(objectUnderTest.get(), equalTo(newClient));
verify(credentialsChangeCounter).increment();
verify(clientFunction, times(2)).apply(any());
}

@Test
void testGetAfterUpdateWithPasswordChanged() {
void testGetAfterUpdateWithDeprecatedPasswordChanged() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
Expand All @@ -136,6 +191,35 @@ void testGetAfterUpdateWithPasswordChanged() {
verify(clientFunction, times(2)).apply(any());
}

@Test
void testGetAfterUpdateWithPasswordChanged() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
verify(clientFunction, times(1)).apply(any());
assertThat(objectUnderTest.get(), equalTo(openSearchClient));
when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig);
when(authConfig.getUsername()).thenReturn(TEST_USERNAME);
when(authConfig.getPassword()).thenReturn(TEST_PASSWORD);
final OpenSearchSinkConfig newConfig = mock(OpenSearchSinkConfig.class);
final ConnectionConfiguration newConnectionConfiguration = mock(ConnectionConfiguration.class);
final AuthConfig newAuthConfig = mock(AuthConfig.class);
when(newConnectionConfiguration.getAuthConfig()).thenReturn(newAuthConfig);
when(newAuthConfig.getUsername()).thenReturn(TEST_USERNAME);
when(newAuthConfig.getPassword()).thenReturn(TEST_PASSWORD + "_changed");
final OpenSearchClient newClient = mock(OpenSearchClient.class);
when(clientFunction.apply(eq(newConnectionConfiguration))).thenReturn(newClient);
try (MockedStatic<ConnectionConfiguration> configurationMockedStatic = mockStatic(
ConnectionConfiguration.class)) {
configurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(eq(newConfig)))
.thenReturn(newConnectionConfiguration);
objectUnderTest.update(newConfig);
}
assertThat(objectUnderTest.get(), equalTo(newClient));
verify(credentialsChangeCounter).increment();
verify(clientFunction, times(2)).apply(any());
}

@Test
void testGetAfterUpdateClientFailure() {
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
Expand Down Expand Up @@ -163,4 +247,4 @@ void testGetAfterUpdateClientFailure() {
verify(clientRefreshErrorsCounter).increment();
verify(clientFunction, times(2)).apply(any());
}
}
}

0 comments on commit faa5c17

Please sign in to comment.