diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java
index b87c5f0c34..9df9095fd4 100644
--- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java
+++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java
@@ -1670,8 +1670,7 @@ private Map<String, Object> initializeConfigurationMetadata(final String indexTy
         final String user = System.getProperty("tests.opensearch.user");
         final String password = System.getProperty("tests.opensearch.password");
         if (user != null) {
-            metadata.put(USERNAME, user);
-            metadata.put(PASSWORD, password);
+            metadata.put(AUTHENTICATION, Map.of(USERNAME, user, PASSWORD, password));
         }
         final String distributionVersion = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo(
                 OpenSearchIntegrationHelper.getVersion()) >= 0 ?
diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java
index 4e3aa49c5a..d16499c45c 100644
--- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java
+++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java
@@ -29,6 +29,7 @@
 import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
 import org.opensearch.dataprepper.aws.api.AwsRequestSigningApache4Interceptor;
 import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.PreSerializedJsonpMapper;
+import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AuthConfig;
 import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AwsAuthenticationConfiguration;
 import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig;
 import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ServerlessOptions;
@@ -98,6 +99,7 @@ public class ConnectionConfiguration {
   private final String serverlessCollectionName;
   private final String serverlessVpceId;
   private final boolean requestCompressionEnabled;
+  private final AuthConfig authConfig;
 
   List<String> getHosts() {
     return hosts;
@@ -159,6 +161,10 @@ boolean isRequestCompressionEnabled() {
     return requestCompressionEnabled;
   }
 
+  public AuthConfig getAuthConfig() {
+    return authConfig;
+  }
+
   private ConnectionConfiguration(final Builder builder) {
     this.hosts = builder.hosts;
     this.username = builder.username;
@@ -178,6 +184,7 @@ private ConnectionConfiguration(final Builder builder) {
     this.serverlessCollectionName = builder.serverlessCollectionName;
     this.serverlessVpceId = builder.serverlessVpceId;
     this.requestCompressionEnabled = builder.requestCompressionEnabled;
+    this.authConfig = builder.authConfig;
   }
 
   public static ConnectionConfiguration readConnectionConfiguration(final OpenSearchSinkConfig openSearchSinkConfig){
@@ -185,11 +192,16 @@ public static ConnectionConfiguration readConnectionConfiguration(final OpenSear
     ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder(hosts);
     final String username = openSearchSinkConfig.getUsername();
     final String password = openSearchSinkConfig.getPassword();
-    if (username != null) {
-      builder = builder.withUsername(username);
-    }
-    if (password != null) {
-      builder = builder.withPassword(password);
+    final AuthConfig authConfig = openSearchSinkConfig.getAuthConfig();
+    if (authConfig != null) {
+      builder = builder.withAuthConfig(authConfig);
+    } else {
+      if (username != null) {
+        builder = builder.withUsername(username);
+      }
+      if (password != null) {
+        builder = builder.withPassword(password);
+      }
     }
     final Integer socketTimeout = openSearchSinkConfig.getSocketTimeout();
     if (socketTimeout != null) {
@@ -314,10 +326,18 @@ public AwsCredentialsOptions createAwsCredentialsOptions() {
 
   private void attachUserCredentials(final RestClientBuilder restClientBuilder) {
     final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-    if (username != null) {
-      LOG.info("Using the username provided in the config.");
-      credentialsProvider.setCredentials(
-              AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+    if (authConfig != null) {
+      if (authConfig.getUsername() != null) {
+        LOG.info("Using the authentication provided in the config.");
+        credentialsProvider.setCredentials(
+                AuthScope.ANY, new UsernamePasswordCredentials(authConfig.getUsername(), authConfig.getPassword()));
+      }
+    } else {
+      if (username != null) {
+        LOG.info("Using the username provided in the config.");
+        credentialsProvider.setCredentials(
+                AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+      }
     }
     restClientBuilder.setHttpClientConfigCallback(
             httpClientBuilder -> {
@@ -481,6 +501,7 @@ public static class Builder {
     private String serverlessCollectionName;
     private String serverlessVpceId;
     private boolean requestCompressionEnabled;
+    private AuthConfig authConfig;
 
     private void validateStsRoleArn(final String awsStsRoleArn) {
       final Arn arn = getArn(awsStsRoleArn);
@@ -605,6 +626,11 @@ public Builder withRequestCompressionEnabled(final boolean requestCompressionEna
       return this;
     }
 
+    public Builder withAuthConfig(final AuthConfig authConfig) {
+      this.authConfig = authConfig;
+      return this;
+    }
+
     public ConnectionConfiguration build() {
       return new ConnectionConfiguration(this);
     }
diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java
index 2dd987b8fd..bd3b268e2d 100644
--- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java
+++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java
@@ -74,13 +74,23 @@ public void update(OpenSearchSinkConfig openSearchSinkConfig) {
     private boolean basicAuthChanged(final ConnectionConfiguration newConfig) {
         final String existingUsername;
         final String existingPassword;
-        existingUsername = currentConfig.getUsername();
-        existingPassword = currentConfig.getPassword();
+        if (currentConfig.getAuthConfig() != null) {
+            existingUsername = currentConfig.getAuthConfig().getUsername();
+            existingPassword = currentConfig.getAuthConfig().getPassword();
+        } else {
+            existingUsername = currentConfig.getUsername();
+            existingPassword = currentConfig.getPassword();
+        }
 
         final String newUsername;
         final String newPassword;
-        newUsername = newConfig.getUsername();
-        newPassword = newConfig.getPassword();
+        if (newConfig.getAuthConfig() != null) {
+            newUsername = newConfig.getAuthConfig().getUsername();
+            newPassword = newConfig.getAuthConfig().getPassword();
+        } else {
+            newUsername = newConfig.getUsername();
+            newPassword = newConfig.getPassword();
+        }
 
         return !Objects.equals(existingUsername, newUsername) ||
                 !Objects.equals(existingPassword, newPassword);
diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/AuthConfig.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/AuthConfig.java
new file mode 100644
index 0000000000..e41f99043c
--- /dev/null
+++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/AuthConfig.java
@@ -0,0 +1,19 @@
+package org.opensearch.dataprepper.plugins.sink.opensearch.configuration;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class AuthConfig {
+    @JsonProperty("username")
+    private String username;
+
+    @JsonProperty("password")
+    private String password;
+
+    public String getUsername() {
+        return username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+}
diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java
index d30c448db9..2bde73c184 100644
--- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java
+++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java
@@ -32,6 +32,15 @@ public class OpenSearchSinkConfig {
     @JsonProperty("username")
     private String username = null;
 
+    @Getter
+    @JsonProperty("authentication")
+    private AuthConfig authConfig;
+
+    @AssertTrue(message = "username and password should not be set when authentication is configured.")
+    public boolean isAuthConfigValid() {
+        return authConfig == null || (username == null && password == null);
+    }
+
     @Getter
     @JsonProperty("password")
     private String password = null;
diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java
index a2356ccb94..9a4cf44b3e 100644
--- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java
+++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java
@@ -180,6 +180,34 @@ void testReadConnectionConfigurationWithDeprecatedBasicCredentialsAndNoCert() th
         assertFalse(connectionConfiguration.isAwsSigv4());
     }
 
+    @Test
+    void testReadConnectionConfigurationWithBasicCredentialsAndNoCert() throws JsonProcessingException {
+        final Map<String, Object> configurationMetadata = generateConfigurationMetadata(
+                TEST_HOSTS, null, null, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false);
+        configurationMetadata.put("authentication", Map.of("username", TEST_USERNAME, "password", TEST_PASSWORD));
+        final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configurationMetadata);
+        final ConnectionConfiguration connectionConfiguration =
+                ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig);
+        assertEquals(TEST_HOSTS, connectionConfiguration.getHosts());
+        assertNull(connectionConfiguration.getUsername());
+        assertNull(connectionConfiguration.getPassword());
+        assertNotNull(connectionConfiguration.getAuthConfig());
+        assertEquals(TEST_USERNAME, connectionConfiguration.getAuthConfig().getUsername());
+        assertEquals(TEST_PASSWORD, connectionConfiguration.getAuthConfig().getPassword());
+        assertEquals(TEST_CONNECT_TIMEOUT, connectionConfiguration.getConnectTimeout());
+        assertEquals(TEST_SOCKET_TIMEOUT, connectionConfiguration.getSocketTimeout());
+        assertFalse(connectionConfiguration.isAwsSigv4());
+    }
+
+    @Test
+    void testReadConnectionConfigurationWithBothDeprecatedBasicCredentialsAndAuthConfigShouldThrow() throws JsonProcessingException {
+        final Map<String, Object> configurationMetadata = generateConfigurationMetadata(
+                TEST_HOSTS, TEST_USERNAME, null, TEST_CONNECT_TIMEOUT, TEST_SOCKET_TIMEOUT, false, null, null, null, false);
+        configurationMetadata.put("authentication", Map.of("username", TEST_USERNAME, "password", TEST_PASSWORD));
+        final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configurationMetadata);
+        assertFalse(openSearchSinkConfig.isAuthConfigValid());
+    }
+
     @Test
     void testCreateClientWithDeprecatedBasicCredentialsAndNoCert() throws IOException {
         final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(
diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java
index e31c680e8d..0ceb793785 100644
--- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java
+++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java
@@ -9,6 +9,7 @@
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.opensearch.client.opensearch.OpenSearchClient;
 import org.opensearch.dataprepper.metrics.PluginMetrics;
+import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.AuthConfig;
 import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig;
 
 import java.util.function.Function;
@@ -40,6 +41,9 @@ class OpenSearchClientRefresherTest {
     @Mock
     private OpenSearchClient openSearchClient;
 
+    @Mock
+    private AuthConfig authConfig;
+
     @Mock
     private PluginMetrics pluginMetrics;
 
@@ -65,7 +69,7 @@ void testGet() {
     }
 
     @Test
-    void testGetAfterUpdateWithBasicAuthUnchanged() {
+    void testGetAfterUpdateWithDeprecatedBasicAuthUnchanged() {
         final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
         assertThat(objectUnderTest.get(), equalTo(openSearchClient));
         verify(clientFunction, times(1)).apply(any());
@@ -86,7 +90,31 @@ void testGetAfterUpdateWithBasicAuthUnchanged() {
     }
 
     @Test
-    void testGetAfterUpdateWithUsernameChanged() {
+    void testGetAfterUpdateWithBasicAuthUnchanged() {
+        final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
+        assertThat(objectUnderTest.get(), equalTo(openSearchClient));
+        verify(clientFunction, times(1)).apply(any());
+        when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig);
+        when(authConfig.getUsername()).thenReturn(TEST_USERNAME);
+        when(authConfig.getPassword()).thenReturn(TEST_PASSWORD);
+        final OpenSearchSinkConfig newConfig = mock(OpenSearchSinkConfig.class);
+        final ConnectionConfiguration newConnectionConfiguration = mock(ConnectionConfiguration.class);
+        final AuthConfig newAuthConfig = mock(AuthConfig.class);
+        when(newConnectionConfiguration.getAuthConfig()).thenReturn(newAuthConfig);
+        when(newAuthConfig.getUsername()).thenReturn(TEST_USERNAME);
+        when(newAuthConfig.getPassword()).thenReturn(TEST_PASSWORD);
+        try (MockedStatic<ConnectionConfiguration> configurationMockedStatic = mockStatic(
+                ConnectionConfiguration.class)) {
+            configurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(eq(newConfig)))
+                    .thenReturn(newConnectionConfiguration);
+            objectUnderTest.update(newConfig);
+        }
+        assertThat(objectUnderTest.get(), equalTo(openSearchClient));
+        verifyNoMoreInteractions(clientFunction);
+    }
+
+    @Test
+    void testGetAfterUpdateWithDeprecatedUsernameChanged() {
         when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
         final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
         assertThat(objectUnderTest.get(), equalTo(openSearchClient));
@@ -109,9 +137,36 @@ void testGetAfterUpdateWithUsernameChanged() {
         verify(clientFunction, times(2)).apply(any());
     }
 
+    @Test
+    void testGetAfterUpdateWithUsernameChanged() {
+        when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
+        final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
+        assertThat(objectUnderTest.get(), equalTo(openSearchClient));
+        verify(clientFunction, times(1)).apply(any());
+        assertThat(objectUnderTest.get(), equalTo(openSearchClient));
+        when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig);
+        when(authConfig.getUsername()).thenReturn(TEST_USERNAME);
+        when(authConfig.getPassword()).thenReturn(TEST_PASSWORD);
+        final OpenSearchSinkConfig newConfig = mock(OpenSearchSinkConfig.class);
+        final ConnectionConfiguration newConnectionConfiguration = mock(ConnectionConfiguration.class);
+        final AuthConfig newAuthConfig = mock(AuthConfig.class);
+        when(newConnectionConfiguration.getAuthConfig()).thenReturn(newAuthConfig);
+        when(newAuthConfig.getUsername()).thenReturn(TEST_USERNAME + "_changed");
+        final OpenSearchClient newClient = mock(OpenSearchClient.class);
+        when(clientFunction.apply(eq(newConnectionConfiguration))).thenReturn(newClient);
+        try (MockedStatic<ConnectionConfiguration> configurationMockedStatic = mockStatic(
+                ConnectionConfiguration.class)) {
+            configurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(eq(newConfig)))
+                    .thenReturn(newConnectionConfiguration);
+            objectUnderTest.update(newConfig);
+        }
+        assertThat(objectUnderTest.get(), equalTo(newClient));
+        verify(credentialsChangeCounter).increment();
+        verify(clientFunction, times(2)).apply(any());
+    }
 
     @Test
-    void testGetAfterUpdateWithPasswordChanged() {
+    void testGetAfterUpdateWithDeprecatedPasswordChanged() {
         when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
         final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
         assertThat(objectUnderTest.get(), equalTo(openSearchClient));
@@ -136,6 +191,35 @@ void testGetAfterUpdateWithPasswordChanged() {
         verify(clientFunction, times(2)).apply(any());
     }
 
+    @Test
+    void testGetAfterUpdateWithPasswordChanged() {
+        when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
+        final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest();
+        assertThat(objectUnderTest.get(), equalTo(openSearchClient));
+        verify(clientFunction, times(1)).apply(any());
+        assertThat(objectUnderTest.get(), equalTo(openSearchClient));
+        when(connectionConfiguration.getAuthConfig()).thenReturn(authConfig);
+        when(authConfig.getUsername()).thenReturn(TEST_USERNAME);
+        when(authConfig.getPassword()).thenReturn(TEST_PASSWORD);
+        final OpenSearchSinkConfig newConfig = mock(OpenSearchSinkConfig.class);
+        final ConnectionConfiguration newConnectionConfiguration = mock(ConnectionConfiguration.class);
+        final AuthConfig newAuthConfig = mock(AuthConfig.class);
+        when(newConnectionConfiguration.getAuthConfig()).thenReturn(newAuthConfig);
+        when(newAuthConfig.getUsername()).thenReturn(TEST_USERNAME);
+        when(newAuthConfig.getPassword()).thenReturn(TEST_PASSWORD + "_changed");
+        final OpenSearchClient newClient = mock(OpenSearchClient.class);
+        when(clientFunction.apply(eq(newConnectionConfiguration))).thenReturn(newClient);
+        try (MockedStatic<ConnectionConfiguration> configurationMockedStatic = mockStatic(
+                ConnectionConfiguration.class)) {
+            configurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(eq(newConfig)))
+                    .thenReturn(newConnectionConfiguration);
+            objectUnderTest.update(newConfig);
+        }
+        assertThat(objectUnderTest.get(), equalTo(newClient));
+        verify(credentialsChangeCounter).increment();
+        verify(clientFunction, times(2)).apply(any());
+    }
+
     @Test
     void testGetAfterUpdateClientFailure() {
         when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
@@ -163,4 +247,4 @@ void testGetAfterUpdateClientFailure() {
         verify(clientRefreshErrorsCounter).increment();
         verify(clientFunction, times(2)).apply(any());
     }
-}
+}
\ No newline at end of file