Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support certificate content in Opensearch Source configuration to sup… #4184

Merged
merged 2 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class TrustStoreProvider {
private static final Logger LOG = LoggerFactory.getLogger(TrustStoreProvider.class);

public TrustManager[] createTrustManager(final Path certificatePath) {
public static TrustManager[] createTrustManager(final Path certificatePath) {
LOG.info("Using the certificate path {} to create trust manager.", certificatePath.toString());
try {
final KeyStore keyStore = createKeyStore(certificatePath);
Expand All @@ -38,7 +38,7 @@ public TrustManager[] createTrustManager(final Path certificatePath) {
}
}

public TrustManager[] createTrustManager(final String certificateContent) {
public static TrustManager[] createTrustManager(final String certificateContent) {
LOG.info("Using the certificate content to create trust manager.");
try (InputStream certificateInputStream = new ByteArrayInputStream(certificateContent.getBytes())) {
final KeyStore keyStore = createKeyStore(certificateInputStream);
Expand All @@ -50,20 +50,20 @@ public TrustManager[] createTrustManager(final String certificateContent) {
}
}

public TrustManager[] createTrustAllManager() {
public static TrustManager[] createTrustAllManager() {
LOG.info("Using the trust all manager to create trust manager.");
return new TrustManager[]{
new X509TrustAllManager()
};
}

private KeyStore createKeyStore(final Path certificatePath) throws Exception {
private static KeyStore createKeyStore(final Path certificatePath) throws Exception {
try (InputStream certificateInputStream = Files.newInputStream(certificatePath)) {
return createKeyStore(certificateInputStream);
}
}

private KeyStore createKeyStore(final InputStream certificateInputStream) throws Exception {
private static KeyStore createKeyStore(final InputStream certificateInputStream) throws Exception {
final CertificateFactory factory = CertificateFactory.getInstance("X.509");
final Certificate trustedCa = factory.generateCertificate(certificateInputStream);
final KeyStore trustStore = KeyStore.getInstance("pkcs12");
Expand All @@ -72,7 +72,7 @@ private KeyStore createKeyStore(final InputStream certificateInputStream) throws
return trustStore;
}

public SSLContext createSSLContext(final Path certificatePath) {
public static SSLContext createSSLContext(final Path certificatePath) {
LOG.info("Using the certificate path to create SSL context.");
try (InputStream is = Files.newInputStream(certificatePath)) {
return createSSLContext(is);
Expand All @@ -81,7 +81,7 @@ public SSLContext createSSLContext(final Path certificatePath) {
}
}

public SSLContext createSSLContext(final String certificateContent) {
public static SSLContext createSSLContext(final String certificateContent) {
LOG.info("Using the certificate content to create SSL context.");
try (InputStream certificateInputStream = new ByteArrayInputStream(certificateContent.getBytes())) {
return createSSLContext(certificateInputStream);
Expand All @@ -90,14 +90,14 @@ public SSLContext createSSLContext(final String certificateContent) {
}
}

private SSLContext createSSLContext(final InputStream certificateInputStream) throws Exception {
private static SSLContext createSSLContext(final InputStream certificateInputStream) throws Exception {
KeyStore trustStore = createKeyStore(certificateInputStream);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore, null);
return sslContextBuilder.build();
}

public SSLContext createSSLContextWithTrustAllStrategy() {
public static SSLContext createSSLContextWithTrustAllStrategy() {
LOG.info("Using the trust all strategy to create SSL context.");
try {
return SSLContexts.custom().loadTrustMaterial(null, new TrustStrategy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ public void configure(Map<String, ?> configs) {
}

private TrustManager[] getTrustManager() {
final TrustStoreProvider trustStoreProvider = new TrustStoreProvider();
final TrustManager[] trustManagers;
if (Objects.nonNull(certificateContent)) {
trustManagers = trustStoreProvider.createTrustManager(certificateContent);
trustManagers = TrustStoreProvider.createTrustManager(certificateContent);
} else {
trustManagers = trustStoreProvider.createTrustAllManager();
trustManagers = TrustStoreProvider.createTrustAllManager();
}
return trustManagers;
}
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/opensearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies {
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:failures-common')
implementation project(':data-prepper-plugins:http-common')
implementation libs.opensearch.client
implementation libs.opensearch.rhlc
implementation libs.opensearch.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public class ConnectionConfiguration {
@JsonProperty("cert")
private Path certPath;

@JsonProperty("certificate_content")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a validation on mutual exclusivity?

@AssertTrue
public boolean certificateFileAndContentAreMutuallyExclusive() {
  if(certPath == null && certificateContent == null)
    return true;
  return certPath != null ^ certificateContent != null;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private String certificateContent;

@JsonProperty("socket_timeout")
private Duration socketTimeout;

Expand All @@ -27,6 +30,10 @@ public Path getCertPath() {
return certPath;
}

public String getCertificateContent() {
return certificateContent;
}

public Duration getSocketTimeout() {
return socketTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.TrustAllStrategy;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHeader;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
Expand All @@ -35,23 +31,17 @@
import org.opensearch.dataprepper.aws.api.AwsRequestSigningApache4Interceptor;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ConnectionConfiguration;
import org.opensearch.dataprepper.plugins.truststore.TrustStoreProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -267,74 +257,41 @@ private void setConnectAndSocketTimeout(final org.elasticsearch.client.RestClien
});
}

private void attachSSLContext(final ApacheHttpClient.Builder apacheHttpClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
TrustManager[] trustManagers = createTrustManagers(openSearchSourceConfiguration.getConnectionConfiguration().getCertPath());
apacheHttpClientBuilder.tlsTrustManagersProvider(() -> trustManagers);
}

private void attachSSLContext(final NettyNioAsyncHttpClient.Builder asyncClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
TrustManager[] trustManagers = createTrustManagers(openSearchSourceConfiguration.getConnectionConfiguration().getCertPath());
TrustManager[] trustManagers = createTrustManagers(openSearchSourceConfiguration.getConnectionConfiguration());
asyncClientBuilder.tlsTrustManagersProvider(() -> trustManagers);
}

private void attachSSLContext(final HttpAsyncClientBuilder httpClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {

final ConnectionConfiguration connectionConfiguration = openSearchSourceConfiguration.getConnectionConfiguration();
final SSLContext sslContext = Objects.nonNull(connectionConfiguration.getCertPath()) ? getCAStrategy(connectionConfiguration.getCertPath()) : getTrustAllStrategy();
final SSLContext sslContext = getCAStrategy(connectionConfiguration);
httpClientBuilder.setSSLContext(sslContext);

if (connectionConfiguration.isInsecure()) {
httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
}
}

private static TrustManager[] createTrustManagers(final Path certPath) {
if (certPath != null) {
LOG.info("Using the cert provided in the config.");
try (InputStream certificateInputStream = Files.newInputStream(certPath)) {
final CertificateFactory factory = CertificateFactory.getInstance("X.509");
final Certificate trustedCa = factory.generateCertificate(certificateInputStream);
final KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);

final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
trustManagerFactory.init(trustStore);
return trustManagerFactory.getTrustManagers();
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
private TrustManager[] createTrustManagers(final ConnectionConfiguration connectionConfiguration) {
final Path certPath = connectionConfiguration.getCertPath();
if (Objects.nonNull(certPath)) {
return TrustStoreProvider.createTrustManager(certPath);
} else if (Objects.nonNull(connectionConfiguration.getCertificateContent())) {
return TrustStoreProvider.createTrustManager(connectionConfiguration.getCertificateContent());
} else {
return new TrustManager[] { new X509TrustAllManager() };
}
}

private SSLContext getCAStrategy(final Path certPath) {
LOG.info("Using the cert provided in the config.");
try {
CertificateFactory factory = CertificateFactory.getInstance("X.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(certPath)) {
trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore, null);
return sslContextBuilder.build();
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
return TrustStoreProvider.createTrustAllManager();
}
}

private SSLContext getTrustAllStrategy() {
LOG.info("Using the trust all strategy");
final TrustStrategy trustStrategy = new TrustAllStrategy();
try {
return SSLContexts.custom().loadTrustMaterial(null, trustStrategy).build();
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
private SSLContext getCAStrategy(final ConnectionConfiguration connectionConfiguration) {
final Path certPath = connectionConfiguration.getCertPath();
if (Objects.nonNull(certPath)) {
return TrustStoreProvider.createSSLContext(certPath);
} else if (Objects.nonNull(connectionConfiguration.getCertificateContent())) {
return TrustStoreProvider.createSSLContext(connectionConfiguration.getCertificateContent());
} else {
return TrustStoreProvider.createSSLContextWithTrustAllStrategy();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,32 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.AwsAuthenticationConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ConnectionConfiguration;
import org.opensearch.dataprepper.plugins.truststore.TrustStoreProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.regions.Region;

import javax.net.ssl.SSLContext;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
Expand All @@ -48,7 +56,7 @@ public class OpenSearchClientFactoryTest {

@BeforeEach
void setup() {
when(openSearchSourceConfiguration.getHosts()).thenReturn(List.of("http://localhost:9200"));
lenient().when(openSearchSourceConfiguration.getHosts()).thenReturn(List.of("http://localhost:9200"));
when(openSearchSourceConfiguration.getConnectionConfiguration()).thenReturn(connectionConfiguration);
}

Expand Down Expand Up @@ -209,4 +217,59 @@ void provideOpenSearchClient_with_aws_auth_and_serverless_flag_true() {
assertThat(awsCredentialsOptions.getStsHeaderOverrides(), equalTo(Collections.emptyMap()));
assertThat(awsCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn));
}

@Test
void provideOpenSearchClient_with_self_signed_certificate() {
try (MockedStatic<TrustStoreProvider> trustStoreProviderMockedStatic = mockStatic(TrustStoreProvider.class)) {
final Path path = mock(Path.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend moving as much out of the try block as possible to make the mocking easier to understand. I believe you can move lines 225-230 out of here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

final SSLContext sslContext = mock(SSLContext.class);
final String username = UUID.randomUUID().toString();
final String password = UUID.randomUUID().toString();
when(openSearchSourceConfiguration.getUsername()).thenReturn(username);
when(openSearchSourceConfiguration.getPassword()).thenReturn(password);
when(connectionConfiguration.getCertPath()).thenReturn(path);
trustStoreProviderMockedStatic.when(() -> TrustStoreProvider.createSSLContext(path))
.thenReturn(sslContext);
final OpenSearchClient openSearchClient = createObjectUnderTest().provideOpenSearchClient(openSearchSourceConfiguration);
assertThat(openSearchClient, notNullValue());
trustStoreProviderMockedStatic.verify(() -> TrustStoreProvider.createSSLContext(path));
}
}

@Test
void provideElasticSearchClient_with_self_signed_certificate() {
try (MockedStatic<TrustStoreProvider> trustStoreProviderMockedStatic = mockStatic(TrustStoreProvider.class)) {
final Path path = mock(Path.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with the above, please move lines 243-248 to sit before the try block.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

final SSLContext sslContext = mock(SSLContext.class);
final String username = UUID.randomUUID().toString();
final String password = UUID.randomUUID().toString();
when(openSearchSourceConfiguration.getUsername()).thenReturn(username);
when(openSearchSourceConfiguration.getPassword()).thenReturn(password);
when(connectionConfiguration.getCertPath()).thenReturn(path);
trustStoreProviderMockedStatic.when(() -> TrustStoreProvider.createSSLContext(path))
.thenReturn(sslContext);
final ElasticsearchClient elasticsearchClient = createObjectUnderTest().provideElasticSearchClient(openSearchSourceConfiguration);
assertThat(elasticsearchClient, notNullValue());
trustStoreProviderMockedStatic.verify(() -> TrustStoreProvider.createSSLContext(path));
}
}


@Test
void createSdkAsyncHttpClient_with_self_signed_certificate() {
try (MockedStatic<TrustStoreProvider> trustStoreProviderMockedStatic = mockStatic(TrustStoreProvider.class)) {
final Path path = mock(Path.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with the above, please move lines 261-269 to sit before the try block.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

final Duration duration = mock(Duration.class);
final String username = UUID.randomUUID().toString();
final String password = UUID.randomUUID().toString();
lenient().when(openSearchSourceConfiguration.getUsername()).thenReturn(username);
lenient().when(openSearchSourceConfiguration.getPassword()).thenReturn(password);
lenient().when(connectionConfiguration.getConnectTimeout()).thenReturn(duration);
lenient().when(openSearchSourceConfiguration.getConnectionConfiguration()).thenReturn(connectionConfiguration);
lenient().when(connectionConfiguration.getCertPath()).thenReturn(path);
final SdkAsyncHttpClient sdkAsyncHttpClient = createObjectUnderTest().createSdkAsyncHttpClient(openSearchSourceConfiguration);
assertThat(sdkAsyncHttpClient, notNullValue());
trustStoreProviderMockedStatic.verify(() -> TrustStoreProvider.createTrustManager(path));
}
}
}
Loading