diff --git a/data-prepper-plugins/http-common/build.gradle b/data-prepper-plugins/http-common/build.gradle index db50ebf241..c6ba00aab4 100644 --- a/data-prepper-plugins/http-common/build.gradle +++ b/data-prepper-plugins/http-common/build.gradle @@ -8,8 +8,10 @@ dependencies { compileOnly 'org.projectlombok:lombok:1.18.30' annotationProcessor 'org.projectlombok:lombok:1.18.30' implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:blocking-buffer') implementation project(':data-prepper-plugins:armeria-common') implementation project(':data-prepper-plugins:common') + implementation project(':data-prepper-plugins:http-source-common') implementation libs.opentelemetry.proto implementation libs.armeria.core implementation libs.armeria.grpc diff --git a/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/CreateServer.java b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/CreateServer.java index 28565e5153..6502977a60 100644 --- a/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/CreateServer.java +++ b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/CreateServer.java @@ -15,14 +15,23 @@ import com.linecorp.armeria.server.grpc.GrpcService; import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; import com.linecorp.armeria.server.healthcheck.HealthCheckService; +import com.linecorp.armeria.server.throttling.ThrottlingService; import io.grpc.BindableService; import io.grpc.MethodDescriptor; import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptors; import io.grpc.protobuf.services.ProtoReflectionService; import org.opensearch.dataprepper.GrpcRequestExceptionHandler; +import org.opensearch.dataprepper.HttpRequestExceptionHandler; +import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.http.LogThrottlingRejectHandler; +import org.opensearch.dataprepper.http.LogThrottlingStrategy; +import org.opensearch.dataprepper.http.certificate.CertificateProviderFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.log.Log; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.codec.CompressionOption; @@ -34,6 +43,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.function.Function; @@ -58,7 +68,7 @@ public CreateServer(final ServerConfiguration serverConfiguration, Logger LOG, P this.pipelineName = pipelineName; } - public Server createGRPCServerBuilder(final GrpcAuthenticationProvider authenticationProvider, BindableService grpcService, CertificateProvider certificateProvider, MethodDescriptor methodDescriptor) { + public Server createGRPCServer(final GrpcAuthenticationProvider authenticationProvider, BindableService grpcService, CertificateProvider certificateProvider, MethodDescriptor methodDescriptor) { final List serverInterceptors = getAuthenticationInterceptor(authenticationProvider); final GrpcServiceBuilder grpcServiceBuilder = GrpcService @@ -141,8 +151,65 @@ public Server createGRPCServerBuilder(final GrpcAuthenticationProvider au return sb.build(); } - public Server createHTTPServerBuilder() { + public Server createHTTPServer(Buffer> buffer, final CertificateProviderFactory certificateProviderFactory, final ArmeriaHttpAuthenticationProvider authenticationProvider, final HttpRequestExceptionHandler httpRequestExceptionHandler) { final ServerBuilder sb = Server.builder(); + + sb.disableServerHeader(); + + if (serverConfiguration.isSsl()) { + LOG.info("Creating http source with SSL/TLS enabled."); + final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); + final Certificate certificate = certificateProvider.getCertificate(); + // TODO: enable encrypted key with password + sb.https(serverConfiguration.getPort()).tls( + new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), + new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8) + ) + ); + } else { + LOG.warn("Creating http source without SSL/TLS. This is not secure."); + LOG.warn("In order to set up TLS for the http source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl"); + sb.http(serverConfiguration.getPort()); + } + + if(serverConfiguration.getAuthentication() != null) { + final Optional> optionalAuthDecorator = authenticationProvider.getAuthenticationDecorator(); + + if (serverConfiguration.isUnauthenticatedHealthCheck()) { + optionalAuthDecorator.ifPresent(authDecorator -> sb.decorator(REGEX_HEALTH, authDecorator)); + } else { + optionalAuthDecorator.ifPresent(sb::decorator); + } + } + + sb.maxNumConnections(serverConfiguration.getMaxConnectionCount()); + sb.requestTimeout(Duration.ofMillis(serverConfiguration.getRequestTimeoutInMillis())); + if(serverConfiguration.getMaxRequestLength() != null) { + sb.maxRequestLength(serverConfiguration.getMaxRequestLength().getBytes()); + } + final int threads = serverConfiguration.getThreadCount(); + final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads); + sb.blockingTaskExecutor(blockingTaskExecutor, true); + final int maxPendingRequests = serverConfiguration.getMaxPendingRequests(); + final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy( + maxPendingRequests, blockingTaskExecutor.getQueue()); + final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics); + + final String httpSourcePath = serverConfiguration.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); + sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler)); + final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics); + + if (CompressionOption.NONE.equals(serverConfiguration.getCompression())) { + sb.annotatedService(httpSourcePath, logHTTPService, httpRequestExceptionHandler); + } else { + sb.annotatedService(httpSourcePath, logHTTPService, DecodingService.newDecorator(), httpRequestExceptionHandler); + } + + if (serverConfiguration.hasHealthCheck()) { + LOG.info("HTTP source health check is enabled"); + sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build()); + } + return sb.build(); } diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/LogHTTPService.java similarity index 99% rename from data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java rename to data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/LogHTTPService.java index c2bd344fc5..683b9dfa01 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java +++ b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/LogHTTPService.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.loghttp; +package org.opensearch.dataprepper.plugins.server; import com.linecorp.armeria.server.ServiceRequestContext; import org.opensearch.dataprepper.metrics.PluginMetrics; diff --git a/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/ServerConfiguration.java b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/ServerConfiguration.java index 8069abb849..ffb9788059 100644 --- a/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/ServerConfiguration.java +++ b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/ServerConfiguration.java @@ -19,7 +19,9 @@ public class ServerConfiguration { static final boolean DEFAULT_SSL = true; static final boolean DEFAULT_USE_ACM_CERT_FOR_SSL = false; static final int DEFAULT_THREAD_COUNT = 200; + static final int DEFAULT_MAX_PENDING_REQUESTS = 1024; static final int DEFAULT_MAX_CONNECTION_COUNT = 500; + static final double BUFFER_TIMEOUT_FRACTION = 0.8; @Setter @Getter @@ -78,6 +80,14 @@ public class ServerConfiguration { @Setter private int maxConnectionCount = DEFAULT_MAX_CONNECTION_COUNT; + @Getter + @Setter + private int maxPendingRequests = DEFAULT_MAX_PENDING_REQUESTS; + + @Getter + @Setter + private int bufferTimeoutInMillis; + public boolean hasHealthCheck() { return healthCheck; } diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java b/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/LogHTTPServiceTest.java similarity index 99% rename from data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java rename to data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/LogHTTPServiceTest.java index 5e2ffe2c7d..4d28352c73 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java +++ b/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/LogHTTPServiceTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.loghttp; +package org.opensearch.dataprepper.plugins.server; import com.linecorp.armeria.server.ServiceRequestContext; import org.junit.jupiter.api.Nested; diff --git a/data-prepper-plugins/http-source/build.gradle b/data-prepper-plugins/http-source/build.gradle index fcfb4ad67f..5c6173b0ab 100644 --- a/data-prepper-plugins/http-source/build.gradle +++ b/data-prepper-plugins/http-source/build.gradle @@ -14,6 +14,7 @@ dependencies { implementation project(':data-prepper-plugins:http-source-common') implementation project(':data-prepper-plugins:common') implementation project(':data-prepper-plugins:armeria-common') + implementation project(':data-prepper-plugins:http-common') implementation libs.armeria.core implementation libs.commons.io implementation 'software.amazon.awssdk:acm' diff --git a/data-prepper-plugins/http-source/src/jmh/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceMeasure.java b/data-prepper-plugins/http-source/src/jmh/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceMeasure.java index 40d5b7c3e2..28de37ea71 100644 --- a/data-prepper-plugins/http-source/src/jmh/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceMeasure.java +++ b/data-prepper-plugins/http-source/src/jmh/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceMeasure.java @@ -21,6 +21,7 @@ import org.openjdk.jmh.annotations.Warmup; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.plugins.server.LogHTTPService; import java.io.IOException; import java.time.Duration; diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/ConvertConfiguration.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/ConvertConfiguration.java new file mode 100644 index 0000000000..a985b3e6e1 --- /dev/null +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/ConvertConfiguration.java @@ -0,0 +1,25 @@ +package org.opensearch.dataprepper.plugins.source.loghttp; + +import org.opensearch.dataprepper.http.HttpServerConfig; +import org.opensearch.dataprepper.plugins.server.ServerConfiguration; + +public class ConvertConfiguration { + + public static ServerConfiguration convertConfiguration(final HttpServerConfig sourceConfig) { + ServerConfiguration serverConfiguration = new ServerConfiguration(); + serverConfiguration.setPath(sourceConfig.getPath()); + serverConfiguration.setHealthCheck(sourceConfig.hasHealthCheckService()); + serverConfiguration.setRequestTimeoutInMillis(sourceConfig.getRequestTimeoutInMillis()); + serverConfiguration.setCompression(sourceConfig.getCompression()); + serverConfiguration.setAuthentication(sourceConfig.getAuthentication()); + serverConfiguration.setSsl(sourceConfig.isSsl()); + serverConfiguration.setUnauthenticatedHealthCheck(sourceConfig.isUnauthenticatedHealthCheck()); + serverConfiguration.setMaxRequestLength(sourceConfig.getMaxRequestLength()); + serverConfiguration.setPort(sourceConfig.getPort()); + serverConfiguration.setThreadCount(sourceConfig.getThreadCount()); + serverConfiguration.setMaxPendingRequests(sourceConfig.getMaxPendingRequests()); + serverConfiguration.setMaxConnectionCount(sourceConfig.getMaxConnectionCount()); + serverConfiguration.setBufferTimeoutInMillis(sourceConfig.getBufferTimeoutInMillis()); + return serverConfiguration; + } +} diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java index 1906a3e40f..2e2b115b2c 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java @@ -5,17 +5,10 @@ package org.opensearch.dataprepper.plugins.source.loghttp; -import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.Server; -import com.linecorp.armeria.server.ServerBuilder; -import com.linecorp.armeria.server.encoding.DecodingService; -import com.linecorp.armeria.server.healthcheck.HealthCheckService; -import com.linecorp.armeria.server.throttling.ThrottlingService; import org.opensearch.dataprepper.HttpRequestExceptionHandler; import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; import org.opensearch.dataprepper.http.HttpServerConfig; -import org.opensearch.dataprepper.http.LogThrottlingRejectHandler; -import org.opensearch.dataprepper.http.LogThrottlingStrategy; import org.opensearch.dataprepper.http.certificate.CertificateProviderFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; @@ -30,20 +23,13 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; -import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; -import org.opensearch.dataprepper.plugins.certificate.model.Certificate; -import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.server.CreateServer; +import org.opensearch.dataprepper.plugins.server.ServerConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.Collections; -import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.function.Function; @DataPrepperPlugin(name = "http", pluginType = Source.class, pluginConfigurationType = HTTPSourceConfig.class) public class HTTPSource implements Source> { @@ -96,65 +82,9 @@ public void start(final Buffer> buffer) { throw new IllegalStateException("Buffer provided is null"); } if (server == null) { - final ServerBuilder sb = Server.builder(); - - sb.disableServerHeader(); - - if (sourceConfig.isSsl()) { - LOG.info("Creating http source with SSL/TLS enabled."); - final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); - final Certificate certificate = certificateProvider.getCertificate(); - // TODO: enable encrypted key with password - sb.https(sourceConfig.getPort()).tls( - new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), - new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8) - ) - ); - } else { - LOG.warn("Creating http source without SSL/TLS. This is not secure."); - LOG.warn("In order to set up TLS for the http source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl"); - sb.http(sourceConfig.getPort()); - } - - if(sourceConfig.getAuthentication() != null) { - final Optional> optionalAuthDecorator = authenticationProvider.getAuthenticationDecorator(); - - if (sourceConfig.isUnauthenticatedHealthCheck()) { - optionalAuthDecorator.ifPresent(authDecorator -> sb.decorator(REGEX_HEALTH, authDecorator)); - } else { - optionalAuthDecorator.ifPresent(sb::decorator); - } - } - - sb.maxNumConnections(sourceConfig.getMaxConnectionCount()); - sb.requestTimeout(Duration.ofMillis(sourceConfig.getRequestTimeoutInMillis())); - if(sourceConfig.getMaxRequestLength() != null) { - sb.maxRequestLength(sourceConfig.getMaxRequestLength().getBytes()); - } - final int threads = sourceConfig.getThreadCount(); - final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads); - sb.blockingTaskExecutor(blockingTaskExecutor, true); - final int maxPendingRequests = sourceConfig.getMaxPendingRequests(); - final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy( - maxPendingRequests, blockingTaskExecutor.getQueue()); - final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics); - - final String httpSourcePath = sourceConfig.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); - sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler)); - final LogHTTPService logHTTPService = new LogHTTPService(sourceConfig.getBufferTimeoutInMillis(), buffer, pluginMetrics); - - if (CompressionOption.NONE.equals(sourceConfig.getCompression())) { - sb.annotatedService(httpSourcePath, logHTTPService, httpRequestExceptionHandler); - } else { - sb.annotatedService(httpSourcePath, logHTTPService, DecodingService.newDecorator(), httpRequestExceptionHandler); - } - - if (sourceConfig.hasHealthCheckService()) { - LOG.info("HTTP source health check is enabled"); - sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build()); - } - - server = sb.build(); + ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(sourceConfig); + CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, "http", pipelineName); + server = createServer.createHTTPServer(buffer, certificateProviderFactory, authenticationProvider, httpRequestExceptionHandler); pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections); } diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java index f51666db59..961baba32b 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java +++ b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java @@ -53,6 +53,7 @@ import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.server.LogHTTPService; import java.io.ByteArrayOutputStream; import java.io.File; diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java index 23be0cf0b3..5a1f2bf007 100644 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java @@ -94,7 +94,7 @@ public void start(Buffer> buffer) { certificateProvider = certificateProviderFactory.getCertificateProvider(); } final MethodDescriptor methodDescriptor = LogsServiceGrpc.getExportMethod(); - server = createServer.createGRPCServerBuilder(authenticationProvider, oTelLogsGrpcService, certificateProvider, methodDescriptor); + server = createServer.createGRPCServer(authenticationProvider, oTelLogsGrpcService, certificateProvider, methodDescriptor); pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections); } diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java index 060012e517..ce79ab0624 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java @@ -92,7 +92,7 @@ public void start(Buffer> buffer) { certificateProvider = certificateProviderFactory.getCertificateProvider(); } final MethodDescriptor methodDescriptor = MetricsServiceGrpc.getExportMethod(); - server = createServer.createGRPCServerBuilder(authenticationProvider, oTelMetricsGrpcService, certificateProvider, methodDescriptor); + server = createServer.createGRPCServer(authenticationProvider, oTelMetricsGrpcService, certificateProvider, methodDescriptor); pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections); } diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java index 08decfbe02..39736ab9e7 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java @@ -92,7 +92,7 @@ public void start(Buffer> buffer) { certificateProvider = certificateProviderFactory.getCertificateProvider(); } final MethodDescriptor methodDescriptor = TraceServiceGrpc.getExportMethod(); - server = createServer.createGRPCServerBuilder(authenticationProvider, oTelTraceGrpcService, certificateProvider, methodDescriptor); + server = createServer.createGRPCServer(authenticationProvider, oTelTraceGrpcService, certificateProvider, methodDescriptor); pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections); }