diff --git a/data-prepper-plugins/http-common/build.gradle b/data-prepper-plugins/http-common/build.gradle index c6ba00aab4..7351ddcb64 100644 --- a/data-prepper-plugins/http-common/build.gradle +++ b/data-prepper-plugins/http-common/build.gradle @@ -11,7 +11,7 @@ dependencies { implementation project(':data-prepper-plugins:blocking-buffer') implementation project(':data-prepper-plugins:armeria-common') implementation project(':data-prepper-plugins:common') - implementation project(':data-prepper-plugins:http-source-common') + implementation project(':data-prepper-plugins:http-source-common' ) implementation libs.opentelemetry.proto implementation libs.armeria.core implementation libs.armeria.grpc diff --git a/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/CreateServerTest.java b/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/CreateServerTest.java new file mode 100644 index 0000000000..eeea1d2cf9 --- /dev/null +++ b/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/CreateServerTest.java @@ -0,0 +1,263 @@ +package org.opensearch.dataprepper.plugins.server; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Empty; +import com.google.protobuf.Message; +import com.google.protobuf.MessageOrBuilder; +import com.google.protobuf.StringValue; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.Request; +import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.common.Response; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.server.Server; +import com.linecorp.armeria.server.grpc.GrpcService; +import io.grpc.BindableService; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.ServerServiceDefinition; +import io.grpc.ServiceDescriptor; +import io.grpc.protobuf.ProtoFileDescriptorSupplier; +import io.grpc.protobuf.ProtoServiceDescriptorSupplier; +import io.grpc.protobuf.ProtoUtils; +import io.grpc.stub.ServerCalls; +import io.grpc.stub.StreamObserver; +import io.micrometer.core.instrument.Measurement; +import io.micrometer.core.instrument.Statistic; +import io.netty.util.AsciiString; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; +import io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc; +import io.opentelemetry.proto.collector.logs.v1.LogsServiceProto; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.HttpRequestExceptionHandler; +import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.http.certificate.CertificateProviderFactory; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.log.Log; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; +import org.opensearch.dataprepper.plugins.certificate.model.Certificate; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class CreateServerTest { + ObjectMapper objectMapper; + private final String TEST_SSL_CERTIFICATE_FILE = getClass().getClassLoader().getResource("test_cert.crt").getFile(); + private final String TEST_SSL_KEY_FILE = getClass().getClassLoader().getResource("test_decrypted_key.key").getFile(); + + private static final RetryInfoConfig TEST_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(50), Duration.ofMillis(2000)); + private String TEST_PIPELINE_NAME = "test-pipeline"; + private String TEST_SOURCE_NAME = "test-source"; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private CertificateProvider certificateProvider; + + private Logger LOG = LoggerFactory.getLogger(CreateServer.class); + + @Mock + private CertificateProviderFactory certificateProviderFactory; + + @Mock + private ArmeriaHttpAuthenticationProvider armeriaAuthenticationProvider; + + @Mock + private HttpRequestExceptionHandler httpRequestExceptionHandler; + + @Mock + private GrpcAuthenticationProvider authenticationProvider; + + @Mock + ServerInterceptor authenticationInterceptor; + + @Mock + private ServerServiceDefinition serviceDef; + + @Mock + private BindableService basicService; + + + @Mock + private Certificate certificate; + + private BlockingBuffer> getBuffer() { + final HashMap integerHashMap = new HashMap<>(); + integerHashMap.put("buffer_size", 1); + integerHashMap.put("batch_size", 1); + final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap); + pluginSetting.setPipelineName(TEST_PIPELINE_NAME); + return new BlockingBuffer<>(pluginSetting); + } + +// @Test +// void createGrpcServerTest() throws JsonProcessingException { +// when(authenticationProvider.getAuthenticationInterceptor()).thenReturn(authenticationInterceptor); +// MockedStatic mockedStatic = mockStatic(ServerInterceptors.class); +// mockedStatic.when(() -> ServerInterceptors.intercept( +// any(ServerServiceDefinition.class), +// any(ServerInterceptor[].class))) +// .thenReturn(serviceDef); +// final Map metadata = createGrpcMetadata(21890, false, 10000, 10, 5, CompressionOption.NONE, null); +// final ServerConfiguration serverConfiguration = createServerConfig(metadata); +// final CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, TEST_SOURCE_NAME, TEST_PIPELINE_NAME); +// createServer.createGRPCServer(authenticationProvider, basicService, certificateProvider, null); +// } + + @Test + void createHttpServerTest() throws IOException { + final Path certFilePath = new File(TEST_SSL_CERTIFICATE_FILE).toPath(); + final Path keyFilePath = new File(TEST_SSL_KEY_FILE).toPath(); + final String certAsString = Files.readString(certFilePath); + final String keyAsString = Files.readString(keyFilePath); + + when(certificate.getCertificate()).thenReturn(certAsString); + when(certificate.getPrivateKey()).thenReturn(keyAsString); + when(certificateProvider.getCertificate()).thenReturn(certificate); + when(certificateProviderFactory.getCertificateProvider()).thenReturn(certificateProvider); + final Map metadata = createHttpMetadata(2021, "/log/ingest", 10_000, 200, 500, 1024, true, CompressionOption.NONE); + final ServerConfiguration serverConfiguration = createServerConfig(metadata); + final CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, TEST_SOURCE_NAME, TEST_PIPELINE_NAME); + Buffer> buffer = getBuffer(); + Server server = createServer.createHTTPServer(buffer, certificateProviderFactory, armeriaAuthenticationProvider, httpRequestExceptionHandler); + assertNotNull(server); + assertDoesNotThrow(() -> server.start()); + assertDoesNotThrow(() -> server.stop()); + } + + + + private Map createGrpcMetadata (Integer port, Boolean ssl, Integer reqeustTimeoutInMillis, Integer maxConnectionCount, Integer threadCount, CompressionOption compression, RetryInfoConfig retryInfo){ + final Map metadata = new HashMap<>(); + metadata.put("port", port); + metadata.put("ssl", ssl); + metadata.put("requestTimeoutInMillis", reqeustTimeoutInMillis); + metadata.put("maxConnectionCount", maxConnectionCount); + metadata.put("threadCount", threadCount); + metadata.put("compression", compression); + metadata.put("retryInfo", retryInfo); + return metadata; + } + + private Map createHttpMetadata (Integer port, String path, Integer requestTimeoutInMillis, Integer threadCount, Integer maxConnectionCount, Integer maxPendingRequests, Boolean hasHealthCheckService, CompressionOption compressionOption){ + final Map metadata = new HashMap<>(); + metadata.put("port", port); + metadata.put("path", path); + metadata.put("requestTimeoutInMillis", requestTimeoutInMillis); + metadata.put("threadCount", threadCount); + metadata.put("maxConnectionCount", maxConnectionCount); + metadata.put("maxPendingRequests", maxPendingRequests); + metadata.put("healthCheck", hasHealthCheckService); + metadata.put("compression", compressionOption); + return metadata; + } + + private ServerConfiguration createServerConfig(final Map metadata) throws JsonProcessingException { + objectMapper = new ObjectMapper(); + objectMapper.registerModule(new JavaTimeModule()); + String json = new ObjectMapper().writeValueAsString(metadata); + return objectMapper.readValue(json, ServerConfiguration.class); + } + + +// +// public class BasicService implements BindableService { +// @Override +// public io.grpc.ServerServiceDefinition bindService() { +// return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor()) +// .addMethod(getMethodDescriptor(), new ServerCallHandler() { +// @Override +// public io.grpc.ServerCall.Listener startCall( +// io.grpc.ServerCall call, +// io.grpc.Metadata headers) { +// return new UnaryServerCallHandler( +// (request, responseObserver) -> { +// handleRequest(request, responseObserver); +// return null; +// }).startCall(call, headers); +// } +// }) +// .build(); +// } +// +// private io.grpc.ServiceDescriptor getServiceDescriptor() { +// return io.grpc.ServiceDescriptor.newBuilder("BasicService") +// .addMethod(getMethodDescriptor()) +// .build(); +// } +// +// private io.grpc.MethodDescriptor getMethodDescriptor() { +// return io.grpc.MethodDescriptor.newBuilder() +// .setType(io.grpc.MethodDescriptor.MethodType.UNARY) +// .setFullMethodName("BasicService/handleRequest") +// .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(Request.getDefaultInstance())) +// .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(Response.getDefaultInstance())) +// .build(); +// } +// +// private void handleRequest(Request request, StreamObserver responseObserver) { +// try { +// Response response = Response.newBuilder() +// .setMessage("Processed request: " + request.getMessage()) +// .build(); +// responseObserver.onNext(response); +// responseObserver.onCompleted(); +// } catch (Exception e) { +// responseObserver.onError(e); +// } +// } +// } +// + +} diff --git a/data-prepper-plugins/http-common/src/test/resources/test_cert.crt b/data-prepper-plugins/http-common/src/test/resources/test_cert.crt new file mode 100644 index 0000000000..26c78d1411 --- /dev/null +++ b/data-prepper-plugins/http-common/src/test/resources/test_cert.crt @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICHTCCAYYCCQD4hqYeYDQZADANBgkqhkiG9w0BAQUFADBSMQswCQYDVQQGEwJV +UzELMAkGA1UECAwCVFgxDzANBgNVBAcMBkF1c3RpbjEPMA0GA1UECgwGQW1hem9u +MRQwEgYDVQQLDAtEYXRhcHJlcHBlcjAgFw0yMTA2MjUxOTIzMTBaGA8yMTIxMDYw +MTE5MjMxMFowUjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlRYMQ8wDQYDVQQHDAZB +dXN0aW4xDzANBgNVBAoMBkFtYXpvbjEUMBIGA1UECwwLRGF0YXByZXBwZXIwgZ8w +DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAKrb3YhdKbQ5PtLHall10iLZC9ZdDVrq +HOvqVSM8NHlL8f82gJ3l0n9k7hYc5eKisutaS9eDTmJ+Dnn8xn/qPSKTIq9Wh+OZ +O+e9YEEpI/G4F9KpGULgMyRg9sJK0GlZdEt9o5GJNJIJUkptJU5eiLuE0IV+jyJo +Nvm8OE6EJPqxAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAjgnX5n/Tt7eo9uakIGAb +uBhvYdR8JqKXqF9rjFJ/MIK7FdQSF/gCdjnvBhzLlZFK/Nb6MGKoSKm5Lcr75LgC +FyhIwp3WlqQksiMFnOypYVY71vqDgj6UKdMaOBgthsYhngj8lC+wsVzWqQvkJ2Qg +/GAIzJwiZfXiaevQHRk79qI= +-----END CERTIFICATE----- diff --git a/data-prepper-plugins/http-common/src/test/resources/test_decrypted_key.key b/data-prepper-plugins/http-common/src/test/resources/test_decrypted_key.key new file mode 100644 index 0000000000..479b877131 --- /dev/null +++ b/data-prepper-plugins/http-common/src/test/resources/test_decrypted_key.key @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXAIBAAKBgQCq292IXSm0OT7Sx2pZddIi2QvWXQ1a6hzr6lUjPDR5S/H/NoCd +5dJ/ZO4WHOXiorLrWkvXg05ifg55/MZ/6j0ikyKvVofjmTvnvWBBKSPxuBfSqRlC +4DMkYPbCStBpWXRLfaORiTSSCVJKbSVOXoi7hNCFfo8iaDb5vDhOhCT6sQIDAQAB +AoGANrrhFqpJDpr7vcb1ER0Fp/YArbT27zVo+EUC6puBb41dQlQyFOImcHpjLaAq +H1PgnjU5cBp2hGQ+vOK0rwrYc/HNl6vfh6N3NbDptMiuoBafRJA9JzYourAM09BU +zmXyr61Yn3KHzx1PRwWe37icX93oXP3P0qHb3dI1ZF4jG0ECQQDU5N/a7ogoz2zn +ZssD6FvUOUQDsdBWdXmhUvg+YdZrV44e4xk+FVzwEONoRktEYKz9MFXlsgNHr445 +KRguHWcJAkEAzXQkwOkN8WID1wrwoobUIMbZSGAZzofwkKXgTTnllnT1qOQXuRbS +aCMejFEymBBef4aXP6N4+va2FKW/MF34aQJAO2oMl1sOoOUSrZngepy0VAwPUUCk +thxe74jqQu6nGpn6zd/vQYZQw6bS8Fz90H1yic6dilcd1znFZWp0lxoZkQJBALeI +xoBycRsuFQIYasi1q3AwUtBd0Q/3zkZZeBtk2hzjFMUwJaUZpxKSNOrialD/ZnuD +jz+xWBTRKe0d98JMX+kCQCmsJEj/HYQAC1GamZ7JQWogRSRF2KTgTWRaDXDxy0d4 +yUQgwHB+HZLFcbi1JEK6eIixCsX8iifrrkteh+1npJ0= +-----END RSA PRIVATE KEY-----