Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for OTEL metrics source to use Kafka buffer #3539

Merged
merged 2 commits into from
Nov 9, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Added tests and fixed test failures
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Krishna Kondaka committed Nov 8, 2023
commit b9d3fb05429711c9571b42c6b3558823990326e9
Original file line number Diff line number Diff line change
@@ -83,15 +83,15 @@ public Collection<Record<? extends Metric>> doExecute(Collection<Record<?>> reco
AtomicInteger droppedCounter = new AtomicInteger(0);

for (Record<?> rec : records) {
Record<? extends Metric> newRecord = (Record<? extends Metric>)rec;
if ((rec.getData() instanceof Event)) {
Record<? extends Metric> newRecord = (Record<? extends Metric>)rec;
if (otelMetricsRawProcessorConfig.getFlattenAttributesFlag() ||
!otelMetricsRawProcessorConfig.getCalculateHistogramBuckets() ||
!otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets()) {
modifyRecord(newRecord, otelMetricsRawProcessorConfig.getFlattenAttributesFlag(), otelMetricsRawProcessorConfig.getCalculateHistogramBuckets(), otelMetricsRawProcessorConfig.getCalculateExponentialHistogramBuckets());
}
recordsOut.add(newRecord);
}
recordsOut.add(newRecord);

if (!(rec.getData() instanceof ExportMetricsServiceRequest)) {
continue;
Original file line number Diff line number Diff line change
@@ -24,11 +24,12 @@
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.metric.JacksonMetric;

import java.util.Arrays;
import java.util.Collections;
import java.util.Collection;
import java.util.List;
import java.util.Map;

@@ -102,8 +103,9 @@ public void test() throws JsonProcessingException {

Record record = new Record<>(exportMetricRequest);

List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record));
Record<Event> firstRecord = rec.get(0);
Collection<Record<?>> records = Arrays.asList((Record<?>)record);
List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>> outputRecords = (List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>>)rawProcessor.doExecute(records);
Record<JacksonMetric> firstRecord = (Record<JacksonMetric>)outputRecords.get(0);

ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> map = objectMapper.readValue(firstRecord.getData().toJsonString(), Map.class);
@@ -182,8 +184,11 @@ public void missingNameInvalidMetricTest() throws JsonProcessingException {
Record record = new Record<>(exportMetricRequest);
Record invalidRecord = new Record<>(exportMetricRequestWithInvalidMetric);

List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record, invalidRecord));
org.hamcrest.MatcherAssert.assertThat(rec.size(), equalTo(1));
Collection<Record<?>> records = Arrays.asList((Record<?>)record, invalidRecord);
List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>> outputRecords = (List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>>)rawProcessor.doExecute(records);

//List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record, invalidRecord));
org.hamcrest.MatcherAssert.assertThat(outputRecords.size(), equalTo(1));
}

private void assertSumProcessing(Map<String, Object> map) {
Original file line number Diff line number Diff line change
@@ -21,11 +21,12 @@
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.metric.JacksonMetric;

import java.util.Arrays;
import java.util.Collections;
import java.util.Collection;
import java.util.List;
import java.util.Map;

@@ -83,8 +84,9 @@ public void testSummaryProcessing() throws JsonProcessingException {

Record record = new Record<>(exportMetricRequest);

List<Record<Event>> rec = (List<Record<Event>>) rawProcessor.doExecute(Arrays.asList(record));
Record<Event> firstRecord = rec.get(0);
Collection<Record<?>> records = Arrays.asList((Record<?>)record);
List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>> outputRecords = (List<Record<? extends org.opensearch.dataprepper.model.metric.Metric>>)rawProcessor.doExecute(records);
Record<JacksonMetric> firstRecord = (Record<JacksonMetric>)outputRecords.get(0);

ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> map = objectMapper.readValue(firstRecord.getData().toJsonString(), Map.class);
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -38,16 +37,13 @@ public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImp
private final Counter successRequestsCounter;
private final DistributionSummary payloadSizeSummary;
private final Timer requestProcessDuration;
private final ByteDecoder byteDecoder;


public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis,
Buffer<Record<ExportMetricsServiceRequest>> buffer,
final ByteDecoder byteDecoder,
final PluginMetrics pluginMetrics) {
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
this.buffer = buffer;
this.byteDecoder = byteDecoder;

requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
Original file line number Diff line number Diff line change
@@ -101,7 +101,6 @@ public void start(Buffer<Record<ExportMetricsServiceRequest>> buffer) {
final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService(
(int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8),
buffer,
byteDecoder,
pluginMetrics
);

Original file line number Diff line number Diff line change
@@ -12,7 +12,6 @@
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.metric.Metric;

import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
Original file line number Diff line number Diff line change
@@ -663,7 +663,7 @@ private List<? extends Record<? extends Metric>> mapHistogram(

private List<? extends Record<? extends Metric>> mapExponentialHistogram(
final io.opentelemetry.proto.metrics.v1.Metric metric,
final String serviceName,
final String serviceName,
final Map<String, Object> ils,
final Map<String, Object> resourceAttributes,
final String schemaUrl,
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
import com.google.protobuf.util.JsonFormat;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.ArrayValue;
import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
@@ -34,6 +35,12 @@
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.log.OpenTelemetryLog;
import org.opensearch.dataprepper.model.metric.Bucket;
import org.opensearch.dataprepper.model.metric.Metric;
import org.opensearch.dataprepper.model.metric.JacksonMetric;
import org.opensearch.dataprepper.model.metric.JacksonGauge;
import org.opensearch.dataprepper.model.metric.JacksonSum;
import org.opensearch.dataprepper.model.metric.JacksonHistogram;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.trace.DefaultLink;
import org.opensearch.dataprepper.model.trace.DefaultSpanEvent;
import org.opensearch.dataprepper.model.trace.DefaultTraceGroupFields;
@@ -56,12 +63,14 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.entry;
@@ -84,9 +93,10 @@ public class OTelProtoCodecTest {
private static final String TEST_REQUEST_BOTH_SPAN_TYPES_JSON_FILE = "test-request-both-span-types.json";
private static final String TEST_REQUEST_NO_SPANS_JSON_FILE = "test-request-no-spans.json";
private static final String TEST_SPAN_EVENT_JSON_FILE = "test-span-event.json";

private static final String TEST_REQUEST_GAUGE_METRICS_JSON_FILE = "test-gauge-metrics.json";
private static final String TEST_REQUEST_SUM_METRICS_JSON_FILE = "test-sum-metrics.json";
private static final String TEST_REQUEST_HISTOGRAM_METRICS_JSON_FILE = "test-histogram-metrics.json";
private static final String TEST_REQUEST_LOGS_JSON_FILE = "test-request-log.json";

private static final String TEST_REQUEST_LOGS_IS_JSON_FILE = "test-request-log-is.json";


@@ -124,6 +134,12 @@ private ExportLogsServiceRequest buildExportLogsServiceRequestFromJsonFile(Strin
return builder.build();
}

private ExportMetricsServiceRequest buildExportMetricsServiceRequestFromJsonFile(String requestJsonFileName) throws IOException {
final ExportMetricsServiceRequest.Builder builder = ExportMetricsServiceRequest.newBuilder();
JsonFormat.parser().merge(getFileAsJsonString(requestJsonFileName), builder);
return builder.build();
}

private String getFileAsJsonString(String requestJsonFileName) throws IOException {
final StringBuilder jsonBuilder = new StringBuilder();
try (final InputStream inputStream = Objects.requireNonNull(
@@ -460,6 +476,71 @@ public void testParseExportLogsServiceRequest_InstrumentationLibrarySpans() thro
validateSpans(spans);
}

@Test
public void testParseExportMetricsServiceRequest_Guage() throws IOException {
final ExportMetricsServiceRequest exportMetricsServiceRequest = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_GAUGE_METRICS_JSON_FILE);
AtomicInteger droppedCount = new AtomicInteger(0);
final Collection<Record<? extends Metric>> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, true, true, true);

validateGaugeMetricRequest(metrics);
}

@Test
public void testParseExportMetricsServiceRequest_Sum() throws IOException {
final ExportMetricsServiceRequest exportMetricsServiceRequest = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_SUM_METRICS_JSON_FILE);
AtomicInteger droppedCount = new AtomicInteger(0);
final Collection<Record<? extends Metric>> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, true, true, true);
validateSumMetricRequest(metrics);
}

@Test
public void testParseExportMetricsServiceRequest_Histogram() throws IOException {
final ExportMetricsServiceRequest exportMetricsServiceRequest = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_HISTOGRAM_METRICS_JSON_FILE);
AtomicInteger droppedCount = new AtomicInteger(0);
final Collection<Record<? extends Metric>> metrics = decoderUnderTest.parseExportMetricsServiceRequest(exportMetricsServiceRequest, droppedCount, 10, true, true, true);
validateHistogramMetricRequest(metrics);
}

private void validateGaugeMetricRequest(Collection<Record<? extends Metric>> metrics) {
assertThat(metrics.size(), equalTo(1));
Record<? extends Metric> record = ((List<Record<? extends Metric>>)metrics).get(0);
JacksonMetric metric = (JacksonMetric) record.getData();
assertThat(metric.getKind(), equalTo(Metric.KIND.GAUGE.toString()));
assertThat(metric.getUnit(), equalTo("1"));
assertThat(metric.getName(), equalTo("counter-int"));
JacksonGauge gauge = (JacksonGauge)metric;
assertThat(gauge.getValue(), equalTo(123.0));
}

private void validateSumMetricRequest(Collection<Record<? extends Metric>> metrics) {
assertThat(metrics.size(), equalTo(1));
Record<? extends Metric> record = ((List<Record<? extends Metric>>)metrics).get(0);
JacksonMetric metric = (JacksonMetric) record.getData();
assertThat(metric.getKind(), equalTo(Metric.KIND.SUM.toString()));
assertThat(metric.getUnit(), equalTo("1"));
assertThat(metric.getName(), equalTo("sum-int"));
JacksonSum sum = (JacksonSum)metric;
assertThat(sum.getValue(), equalTo(456.0));
}

private void validateHistogramMetricRequest(Collection<Record<? extends Metric>> metrics) {
assertThat(metrics.size(), equalTo(1));
Record<? extends Metric> record = ((List<Record<? extends Metric>>)metrics).get(0);
JacksonMetric metric = (JacksonMetric) record.getData();
assertThat(metric.getKind(), equalTo(Metric.KIND.HISTOGRAM.toString()));
assertThat(metric.getUnit(), equalTo("1"));
assertThat(metric.getName(), equalTo("histogram-int"));
JacksonHistogram histogram = (JacksonHistogram)metric;
assertThat(histogram.getSum(), equalTo(100.0));
assertThat(histogram.getCount(), equalTo(30L));
assertThat(histogram.getExemplars(), equalTo(Collections.emptyList()));
assertThat(histogram.getExplicitBoundsList(), equalTo(List.of(1.0, 2.0, 3.0, 4.0)));
assertThat(histogram.getExplicitBoundsCount(), equalTo(4));
assertThat(histogram.getBucketCountsList(), equalTo(List.of(3L, 5L, 15L, 6L, 1L)));
assertThat(histogram.getBucketCount(), equalTo(5));
assertThat(histogram.getAggregationTemporality(), equalTo("AGGREGATION_TEMPORALITY_CUMULATIVE"));
}

}

@Nested
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "resource-attr",
"value": {
"stringValue": "resource-attr-val-1"
}
}
]
},
"scopeMetrics": [
{
"scope": {},
"metrics": [
{
"name": "counter-int",
"unit": 1,
"gauge": {
"dataPoints": [
{
"attributes": [
{
"key": "label-1",
"value": {
"stringValue": "label-value-1"
}
}
],
"startTimeUnixNano": "1581452773000000789",
"timeUnixNano": "1581452773000000789",
"asInt": "123"
}
]
}
}
]
}
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "resource-attr",
"value": {
"stringValue": "resource-attr-val-1"
}
}
]
},
"scopeMetrics": [
{
"scope": {},
"metrics": [
{
"name": "histogram-int",
"unit": 1,
"histogram": {
"dataPoints": [
{
"attributes": [
{
"key": "label-1",
"value": {
"stringValue": "label-value-1"
}
}
],
"startTimeUnixNano": "1581452773000000789",
"timeUnixNano": "1581452773000000789",
"count": "30",
"sum": "100",
"bucket_counts": [3, 5, 15, 6, 1],
"explicit_bounds": [1.0, 2.0, 3.0, 4.0],
"exemplars": []
}
],
"aggregationTemporality":"2"
}
}
]
}
]
}
]
}

Loading