Skip to content

Commit

Permalink
Add new OTEL Metrics source that creates events
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Feb 24, 2024
1 parent 1aede50 commit 74d8271
Show file tree
Hide file tree
Showing 3 changed files with 295 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.otelmetrics;

import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.encoding.DecodingService;
import com.linecorp.armeria.server.grpc.GrpcService;
import com.linecorp.armeria.server.grpc.GrpcServiceBuilder;
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
import io.grpc.MethodDescriptor;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.protobuf.services.ProtoReflectionService;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import org.opensearch.dataprepper.GrpcRequestExceptionHandler;
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.metric.Metric;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
import org.opensearch.dataprepper.plugins.health.HealthGrpcService;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
import org.opensearch.dataprepper.plugins.source.otelmetrics.certificate.CertificateProviderFactory;
import org.opensearch.dataprepper.exceptions.BufferWriteException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.concurrent.atomic.AtomicInteger;

@DataPrepperPlugin(name = "otel_metrics", pluginType = Source.class, pluginConfigurationType = OTelMetricsSourceConfig.class)
public class OTelMetrics implements Source<Record<? extends Metric>> {
private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsSource.class);
private static final String HTTP_HEALTH_CHECK_PATH = "/health";
private static final String REGEX_HEALTH = "regex:^/(?!health$).*$";
private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}";

private final OTelMetricsSourceConfig oTelMetricsSourceConfig;
private final String pipelineName;
private final PluginMetrics pluginMetrics;
private final GrpcAuthenticationProvider authenticationProvider;
private final CertificateProviderFactory certificateProviderFactory;
private final GrpcRequestExceptionHandler requestExceptionHandler;
private Server server;
private final ByteDecoder byteDecoder;

@DataPrepperPluginConstructor
public OTelMetrics(final OTelMetricsSourceConfig oTelMetricsSourceConfig, final PluginMetrics pluginMetrics,
final PluginFactory pluginFactory, final PipelineDescription pipelineDescription) {
this(oTelMetricsSourceConfig, pluginMetrics, pluginFactory, new CertificateProviderFactory(oTelMetricsSourceConfig), pipelineDescription);
}

// accessible only in the same package for unit test
OTelMetrics(final OTelMetricsSourceConfig oTelMetricsSourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory,
final CertificateProviderFactory certificateProviderFactory, final PipelineDescription pipelineDescription) {
oTelMetricsSourceConfig.validateAndInitializeCertAndKeyFileInS3();
this.oTelMetricsSourceConfig = oTelMetricsSourceConfig;
this.pluginMetrics = pluginMetrics;
this.certificateProviderFactory = certificateProviderFactory;
this.pipelineName = pipelineDescription.getPipelineName();
this.authenticationProvider = createAuthenticationProvider(pluginFactory);
this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
this.byteDecoder = new OTelMetricDecoder();
}

@Override
public ByteDecoder getDecoder() {
return byteDecoder;
}

@Override
public void start(Buffer<Record<? extends Metric>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}

if (server == null) {
final int bufferWriteTimeoutInMillis =
(int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8);

final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService(
(int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8),
request -> {
try {
if (buffer.isByteBuffer()) {
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
} else {
Collection<Record<? extends Metric>> metrics;
AtomicInteger droppedCounter = new AtomicInteger(0);

OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder();
metrics = oTelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false);
buffer.writeAll(metrics, bufferWriteTimeoutInMillis);
}
} catch (Exception e) {
LOG.error("Failed to write the request of size {} due to:", request.toString().length(), e);
throw new BufferWriteException(e.getMessage(), e);
}
},
pluginMetrics
);

final List<ServerInterceptor> serverInterceptors = getAuthenticationInterceptor();

final GrpcServiceBuilder grpcServiceBuilder = GrpcService
.builder()
.useClientTimeoutHeader(false)
.useBlockingTaskExecutor(true)
.exceptionMapping(requestExceptionHandler);

final MethodDescriptor<ExportMetricsServiceRequest, ExportMetricsServiceResponse> methodDescriptor = MetricsServiceGrpc.getExportMethod();
final String oTelMetricsSourcePath = oTelMetricsSourceConfig.getPath();
if (oTelMetricsSourcePath != null) {
final String transformedOTelMetricsSourcePath = oTelMetricsSourcePath.replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
grpcServiceBuilder.addService(transformedOTelMetricsSourcePath,
ServerInterceptors.intercept(oTelMetricsGrpcService, serverInterceptors), methodDescriptor);
} else {
grpcServiceBuilder.addService(ServerInterceptors.intercept(oTelMetricsGrpcService, serverInterceptors));
}

if (oTelMetricsSourceConfig.hasHealthCheck()) {
LOG.info("Health check is enabled");
grpcServiceBuilder.addService(new HealthGrpcService());
}

if (oTelMetricsSourceConfig.hasProtoReflectionService()) {
LOG.info("Proto reflection service is enabled");
grpcServiceBuilder.addService(ProtoReflectionService.newInstance());
}

grpcServiceBuilder.enableUnframedRequests(oTelMetricsSourceConfig.enableUnframedRequests());

final ServerBuilder sb = Server.builder();
sb.disableServerHeader();
if (CompressionOption.NONE.equals(oTelMetricsSourceConfig.getCompression())) {
sb.service(grpcServiceBuilder.build());
} else {
sb.service(grpcServiceBuilder.build(), DecodingService.newDecorator());
}

if(oTelMetricsSourceConfig.enableHttpHealthCheck()) {
sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build());
}

if(oTelMetricsSourceConfig.getAuthentication() != null) {
final Optional<Function<? super HttpService, ? extends HttpService>> optionalHttpAuthenticationService =
authenticationProvider.getHttpAuthenticationService();

if(oTelMetricsSourceConfig.isUnauthenticatedHealthCheck()) {
optionalHttpAuthenticationService.ifPresent(httpAuthenticationService ->
sb.decorator(REGEX_HEALTH, httpAuthenticationService));
} else {
optionalHttpAuthenticationService.ifPresent(sb::decorator);
}
}

sb.requestTimeoutMillis(oTelMetricsSourceConfig.getRequestTimeoutInMillis());
if(oTelMetricsSourceConfig.getMaxRequestLength() != null) {
sb.maxRequestLength(oTelMetricsSourceConfig.getMaxRequestLength().getBytes());
}

// ACM Cert for SSL takes preference
if (oTelMetricsSourceConfig.isSsl() || oTelMetricsSourceConfig.useAcmCertForSSL()) {
LOG.info("SSL/TLS is enabled.");
final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();
final Certificate certificate = certificateProvider.getCertificate();
sb.https(oTelMetricsSourceConfig.getPort()).tls(
new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)),
new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)
)
);
} else {
LOG.warn("Creating otel_metrics_source without SSL/TLS. This is not secure.");
LOG.warn("In order to set up TLS for the otel_metrics_source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-metrics-source#ssl");
sb.http(oTelMetricsSourceConfig.getPort());
}

sb.maxNumConnections(oTelMetricsSourceConfig.getMaxConnectionCount());
sb.blockingTaskExecutor(
Executors.newScheduledThreadPool(oTelMetricsSourceConfig.getThreadCount()),
true);

server = sb.build();
}
try {
server.start().get();
} catch (ExecutionException ex) {
if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) {
throw (RuntimeException) ex.getCause();
} else {
throw new RuntimeException(ex);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException(ex);
}
LOG.info("Started otel_metrics_source...");
}

@Override
public void stop() {
if (server != null) {
try {
server.stop().get();
} catch (ExecutionException ex) {
if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) {
throw (RuntimeException) ex.getCause();
} else {
throw new RuntimeException(ex);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException(ex);
}
}
LOG.info("Stopped otel_metrics_source.");
}

private List<ServerInterceptor> getAuthenticationInterceptor() {
final ServerInterceptor authenticationInterceptor = authenticationProvider.getAuthenticationInterceptor();
if (authenticationInterceptor == null) {
return Collections.emptyList();
}
return Collections.singletonList(authenticationInterceptor);
}

private GrpcAuthenticationProvider createAuthenticationProvider(final PluginFactory pluginFactory) {
final PluginModel authenticationConfiguration = oTelMetricsSourceConfig.getAuthentication();

if (authenticationConfiguration == null || authenticationConfiguration.getPluginName().equals(GrpcAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME)) {
LOG.warn("Creating otel-metrics-source without authentication. This is not secure.");
LOG.warn("In order to set up Http Basic authentication for the otel-metrics-source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-metrics-source#authentication-configurations");
}

final PluginSetting authenticationPluginSetting;
if (authenticationConfiguration != null) {
authenticationPluginSetting = new PluginSetting(authenticationConfiguration.getPluginName(), authenticationConfiguration.getPluginSettings());
} else {
authenticationPluginSetting = new PluginSetting(GrpcAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, Collections.emptyMap());
}
authenticationPluginSetting.setPipelineName(pipelineName);
return pluginFactory.loadPlugin(GrpcAuthenticationProvider.class, authenticationPluginSetting);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import org.opensearch.dataprepper.exceptions.BufferWriteException;
import org.opensearch.dataprepper.exceptions.RequestCancelledException;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Consumer;

public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImplBase {
private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsGrpcService.class);

Expand All @@ -31,7 +30,7 @@ public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImp
public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration";

private final int bufferWriteTimeoutInMillis;
private final Buffer<Record<ExportMetricsServiceRequest>> buffer;
private final Consumer<ExportMetricsServiceRequest> consumer;

private final Counter requestsReceivedCounter;
private final Counter successRequestsCounter;
Expand All @@ -40,26 +39,18 @@ public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImp


public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis,
Buffer<Record<ExportMetricsServiceRequest>> buffer,

final Consumer<ExportMetricsServiceRequest> consumer,
final PluginMetrics pluginMetrics) {
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
this.buffer = buffer;
this.consumer = consumer;

requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE);
requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION);
}

public void rawExport(final ExportMetricsServiceRequest request) {
try {
if (buffer.isByteBuffer()) {
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
}
} catch (Exception e) {
}
}

@Override
public void export(final ExportMetricsServiceRequest request, final StreamObserver<ExportMetricsServiceResponse> responseObserver) {
requestsReceivedCounter.increment();
Expand All @@ -77,21 +68,7 @@ public void export(final ExportMetricsServiceRequest request, final StreamObserv
}

private void processRequest(final ExportMetricsServiceRequest request, final StreamObserver<ExportMetricsServiceResponse> responseObserver) {
try {
if (buffer.isByteBuffer()) {
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
} else {
buffer.write(new Record<>(request), bufferWriteTimeoutInMillis);
}
} catch (Exception e) {
if (ServiceRequestContext.current().isTimedOut()) {
LOG.warn("Exception writing to buffer but request already timed out.", e);
return;
}

LOG.error("Failed to write the request of size {} due to:", request.toString().length(), e);
throw new BufferWriteException(e.getMessage(), e);
}
consumer.accept(request);

if (ServiceRequestContext.current().isTimedOut()) {
LOG.warn("Buffer write completed successfully but request already timed out.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
import org.opensearch.dataprepper.plugins.health.HealthGrpcService;
import org.opensearch.dataprepper.plugins.source.otelmetrics.certificate.CertificateProviderFactory;
import org.opensearch.dataprepper.exceptions.BufferWriteException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -98,9 +99,23 @@ public void start(Buffer<Record<ExportMetricsServiceRequest>> buffer) {

if (server == null) {

final int bufferWriteTimeoutInMillis =
(int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8);
final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService(
(int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8),
buffer,

request -> {
try {
if (buffer.isByteBuffer()) {
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
} else {
buffer.write(new Record<>(request), bufferWriteTimeoutInMillis);
}
} catch (Exception e) {
LOG.error("Failed to write the request of size {} due to:", request.toString().length(), e);
throw new BufferWriteException(e.getMessage(), e);
}
},
pluginMetrics
);

Expand Down

0 comments on commit 74d8271

Please sign in to comment.