Skip to content

Commit

Permalink
common http server creation
Browse files Browse the repository at this point in the history
Signed-off-by: Maxwell Brown <[email protected]>
  • Loading branch information
Galactus22625 committed Feb 4, 2025
1 parent a8e41ae commit 0977a07
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 82 deletions.
2 changes: 2 additions & 0 deletions data-prepper-plugins/http-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ dependencies {
compileOnly 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:blocking-buffer')
implementation project(':data-prepper-plugins:armeria-common')
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:http-source-common')
implementation libs.opentelemetry.proto
implementation libs.armeria.core
implementation libs.armeria.grpc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,23 @@
import com.linecorp.armeria.server.grpc.GrpcService;
import com.linecorp.armeria.server.grpc.GrpcServiceBuilder;
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
import com.linecorp.armeria.server.throttling.ThrottlingService;
import io.grpc.BindableService;
import io.grpc.MethodDescriptor;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.protobuf.services.ProtoReflectionService;
import org.opensearch.dataprepper.GrpcRequestExceptionHandler;
import org.opensearch.dataprepper.HttpRequestExceptionHandler;
import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider;
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
import org.opensearch.dataprepper.http.LogThrottlingRejectHandler;
import org.opensearch.dataprepper.http.LogThrottlingStrategy;
import org.opensearch.dataprepper.http.certificate.CertificateProviderFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.log.Log;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
Expand All @@ -34,6 +43,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Function;


Expand All @@ -58,7 +68,7 @@ public CreateServer(final ServerConfiguration serverConfiguration, Logger LOG, P
this.pipelineName = pipelineName;
}

public <K, V> Server createGRPCServerBuilder(final GrpcAuthenticationProvider authenticationProvider, BindableService grpcService, CertificateProvider certificateProvider, MethodDescriptor<K, V> methodDescriptor) {
public <K, V> Server createGRPCServer(final GrpcAuthenticationProvider authenticationProvider, BindableService grpcService, CertificateProvider certificateProvider, MethodDescriptor<K, V> methodDescriptor) {
final List<ServerInterceptor> serverInterceptors = getAuthenticationInterceptor(authenticationProvider);

final GrpcServiceBuilder grpcServiceBuilder = GrpcService
Expand Down Expand Up @@ -141,8 +151,65 @@ public <K, V> Server createGRPCServerBuilder(final GrpcAuthenticationProvider au
return sb.build();
}

public Server createHTTPServerBuilder() {
public Server createHTTPServer(Buffer<Record<Log>> buffer, final CertificateProviderFactory certificateProviderFactory, final ArmeriaHttpAuthenticationProvider authenticationProvider, final HttpRequestExceptionHandler httpRequestExceptionHandler) {
final ServerBuilder sb = Server.builder();

sb.disableServerHeader();

if (serverConfiguration.isSsl()) {
LOG.info("Creating http source with SSL/TLS enabled.");
final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();
final Certificate certificate = certificateProvider.getCertificate();
// TODO: enable encrypted key with password
sb.https(serverConfiguration.getPort()).tls(
new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)),
new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)
)
);
} else {
LOG.warn("Creating http source without SSL/TLS. This is not secure.");
LOG.warn("In order to set up TLS for the http source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl");
sb.http(serverConfiguration.getPort());
}

if(serverConfiguration.getAuthentication() != null) {
final Optional<Function<? super HttpService, ? extends HttpService>> optionalAuthDecorator = authenticationProvider.getAuthenticationDecorator();

if (serverConfiguration.isUnauthenticatedHealthCheck()) {
optionalAuthDecorator.ifPresent(authDecorator -> sb.decorator(REGEX_HEALTH, authDecorator));
} else {
optionalAuthDecorator.ifPresent(sb::decorator);
}
}

sb.maxNumConnections(serverConfiguration.getMaxConnectionCount());
sb.requestTimeout(Duration.ofMillis(serverConfiguration.getRequestTimeoutInMillis()));
if(serverConfiguration.getMaxRequestLength() != null) {
sb.maxRequestLength(serverConfiguration.getMaxRequestLength().getBytes());
}
final int threads = serverConfiguration.getThreadCount();
final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads);
sb.blockingTaskExecutor(blockingTaskExecutor, true);
final int maxPendingRequests = serverConfiguration.getMaxPendingRequests();
final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy(
maxPendingRequests, blockingTaskExecutor.getQueue());
final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics);

final String httpSourcePath = serverConfiguration.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler));
final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics);

if (CompressionOption.NONE.equals(serverConfiguration.getCompression())) {
sb.annotatedService(httpSourcePath, logHTTPService, httpRequestExceptionHandler);
} else {
sb.annotatedService(httpSourcePath, logHTTPService, DecodingService.newDecorator(), httpRequestExceptionHandler);
}

if (serverConfiguration.hasHealthCheck()) {
LOG.info("HTTP source health check is enabled");
sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build());
}

return sb.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.loghttp;
package org.opensearch.dataprepper.plugins.server;

import com.linecorp.armeria.server.ServiceRequestContext;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public class ServerConfiguration {
static final boolean DEFAULT_SSL = true;
static final boolean DEFAULT_USE_ACM_CERT_FOR_SSL = false;
static final int DEFAULT_THREAD_COUNT = 200;
static final int DEFAULT_MAX_PENDING_REQUESTS = 1024;
static final int DEFAULT_MAX_CONNECTION_COUNT = 500;
static final double BUFFER_TIMEOUT_FRACTION = 0.8;

@Setter
@Getter
Expand Down Expand Up @@ -78,6 +80,14 @@ public class ServerConfiguration {
@Setter
private int maxConnectionCount = DEFAULT_MAX_CONNECTION_COUNT;

@Getter
@Setter
private int maxPendingRequests = DEFAULT_MAX_PENDING_REQUESTS;

@Getter
@Setter
private int bufferTimeoutInMillis;

public boolean hasHealthCheck() {
return healthCheck;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.loghttp;
package org.opensearch.dataprepper.plugins.server;

import com.linecorp.armeria.server.ServiceRequestContext;
import org.junit.jupiter.api.Nested;
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/http-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
implementation project(':data-prepper-plugins:http-source-common')
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:armeria-common')
implementation project(':data-prepper-plugins:http-common')
implementation libs.armeria.core
implementation libs.commons.io
implementation 'software.amazon.awssdk:acm'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.openjdk.jmh.annotations.Warmup;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.plugins.server.LogHTTPService;

import java.io.IOException;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.opensearch.dataprepper.plugins.source.loghttp;

import org.opensearch.dataprepper.http.HttpServerConfig;
import org.opensearch.dataprepper.plugins.server.ServerConfiguration;

public class ConvertConfiguration {

public static ServerConfiguration convertConfiguration(final HttpServerConfig sourceConfig) {
ServerConfiguration serverConfiguration = new ServerConfiguration();
serverConfiguration.setPath(sourceConfig.getPath());
serverConfiguration.setHealthCheck(sourceConfig.hasHealthCheckService());
serverConfiguration.setRequestTimeoutInMillis(sourceConfig.getRequestTimeoutInMillis());
serverConfiguration.setCompression(sourceConfig.getCompression());
serverConfiguration.setAuthentication(sourceConfig.getAuthentication());
serverConfiguration.setSsl(sourceConfig.isSsl());
serverConfiguration.setUnauthenticatedHealthCheck(sourceConfig.isUnauthenticatedHealthCheck());
serverConfiguration.setMaxRequestLength(sourceConfig.getMaxRequestLength());
serverConfiguration.setPort(sourceConfig.getPort());
serverConfiguration.setThreadCount(sourceConfig.getThreadCount());
serverConfiguration.setMaxPendingRequests(sourceConfig.getMaxPendingRequests());
serverConfiguration.setMaxConnectionCount(sourceConfig.getMaxConnectionCount());
serverConfiguration.setBufferTimeoutInMillis(sourceConfig.getBufferTimeoutInMillis());
return serverConfiguration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,10 @@

package org.opensearch.dataprepper.plugins.source.loghttp;

import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.encoding.DecodingService;
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
import com.linecorp.armeria.server.throttling.ThrottlingService;
import org.opensearch.dataprepper.HttpRequestExceptionHandler;
import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider;
import org.opensearch.dataprepper.http.HttpServerConfig;
import org.opensearch.dataprepper.http.LogThrottlingRejectHandler;
import org.opensearch.dataprepper.http.LogThrottlingStrategy;
import org.opensearch.dataprepper.http.certificate.CertificateProviderFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand All @@ -30,20 +23,13 @@
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.server.CreateServer;
import org.opensearch.dataprepper.plugins.server.ServerConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Function;

@DataPrepperPlugin(name = "http", pluginType = Source.class, pluginConfigurationType = HTTPSourceConfig.class)
public class HTTPSource implements Source<Record<Log>> {
Expand Down Expand Up @@ -96,65 +82,9 @@ public void start(final Buffer<Record<Log>> buffer) {
throw new IllegalStateException("Buffer provided is null");
}
if (server == null) {
final ServerBuilder sb = Server.builder();

sb.disableServerHeader();

if (sourceConfig.isSsl()) {
LOG.info("Creating http source with SSL/TLS enabled.");
final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();
final Certificate certificate = certificateProvider.getCertificate();
// TODO: enable encrypted key with password
sb.https(sourceConfig.getPort()).tls(
new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)),
new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)
)
);
} else {
LOG.warn("Creating http source without SSL/TLS. This is not secure.");
LOG.warn("In order to set up TLS for the http source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl");
sb.http(sourceConfig.getPort());
}

if(sourceConfig.getAuthentication() != null) {
final Optional<Function<? super HttpService, ? extends HttpService>> optionalAuthDecorator = authenticationProvider.getAuthenticationDecorator();

if (sourceConfig.isUnauthenticatedHealthCheck()) {
optionalAuthDecorator.ifPresent(authDecorator -> sb.decorator(REGEX_HEALTH, authDecorator));
} else {
optionalAuthDecorator.ifPresent(sb::decorator);
}
}

sb.maxNumConnections(sourceConfig.getMaxConnectionCount());
sb.requestTimeout(Duration.ofMillis(sourceConfig.getRequestTimeoutInMillis()));
if(sourceConfig.getMaxRequestLength() != null) {
sb.maxRequestLength(sourceConfig.getMaxRequestLength().getBytes());
}
final int threads = sourceConfig.getThreadCount();
final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads);
sb.blockingTaskExecutor(blockingTaskExecutor, true);
final int maxPendingRequests = sourceConfig.getMaxPendingRequests();
final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy(
maxPendingRequests, blockingTaskExecutor.getQueue());
final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics);

final String httpSourcePath = sourceConfig.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler));
final LogHTTPService logHTTPService = new LogHTTPService(sourceConfig.getBufferTimeoutInMillis(), buffer, pluginMetrics);

if (CompressionOption.NONE.equals(sourceConfig.getCompression())) {
sb.annotatedService(httpSourcePath, logHTTPService, httpRequestExceptionHandler);
} else {
sb.annotatedService(httpSourcePath, logHTTPService, DecodingService.newDecorator(), httpRequestExceptionHandler);
}

if (sourceConfig.hasHealthCheckService()) {
LOG.info("HTTP source health check is enabled");
sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build());
}

server = sb.build();
ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(sourceConfig);
CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, "http", pipelineName);
server = createServer.createHTTPServer(buffer, certificateProviderFactory, authenticationProvider, httpRequestExceptionHandler);
pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.server.LogHTTPService;

import java.io.ByteArrayOutputStream;
import java.io.File;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void start(Buffer<Record<Object>> buffer) {
certificateProvider = certificateProviderFactory.getCertificateProvider();
}
final MethodDescriptor<ExportLogsServiceRequest, ExportLogsServiceResponse> methodDescriptor = LogsServiceGrpc.getExportMethod();
server = createServer.createGRPCServerBuilder(authenticationProvider, oTelLogsGrpcService, certificateProvider, methodDescriptor);
server = createServer.createGRPCServer(authenticationProvider, oTelLogsGrpcService, certificateProvider, methodDescriptor);

pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void start(Buffer<Record<? extends Metric>> buffer) {
certificateProvider = certificateProviderFactory.getCertificateProvider();
}
final MethodDescriptor<ExportMetricsServiceRequest, ExportMetricsServiceResponse> methodDescriptor = MetricsServiceGrpc.getExportMethod();
server = createServer.createGRPCServerBuilder(authenticationProvider, oTelMetricsGrpcService, certificateProvider, methodDescriptor);
server = createServer.createGRPCServer(authenticationProvider, oTelMetricsGrpcService, certificateProvider, methodDescriptor);

pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void start(Buffer<Record<Object>> buffer) {
certificateProvider = certificateProviderFactory.getCertificateProvider();
}
final MethodDescriptor<ExportTraceServiceRequest, ExportTraceServiceResponse> methodDescriptor = TraceServiceGrpc.getExportMethod();
server = createServer.createGRPCServerBuilder(authenticationProvider, oTelTraceGrpcService, certificateProvider, methodDescriptor);
server = createServer.createGRPCServer(authenticationProvider, oTelTraceGrpcService, certificateProvider, methodDescriptor);

pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
}
Expand Down

0 comments on commit 0977a07

Please sign in to comment.