From a44f90918d80f6e7ae7b0bb9b2b9f34cf6af9a67 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 7 Feb 2025 00:19:00 +0000 Subject: [PATCH] Addressed review comments Signed-off-by: Krishna Kondaka --- .../otel-proto-common/build.gradle | 4 + .../otel/codec/OTelLogsProtoBufDecoder.java | 39 ++++--- .../codec/OTelLogsProtoBufDecoderTest.java | 101 ++++++++++++++++-- 3 files changed, 113 insertions(+), 31 deletions(-) diff --git a/data-prepper-plugins/otel-proto-common/build.gradle b/data-prepper-plugins/otel-proto-common/build.gradle index 657d9a8bd9..7c27e0fbd0 100644 --- a/data-prepper-plugins/otel-proto-common/build.gradle +++ b/data-prepper-plugins/otel-proto-common/build.gradle @@ -7,6 +7,10 @@ plugins { id 'java' } +test { + jvmArgs '-Xmx4g' +} + dependencies { implementation project(':data-prepper-api') implementation libs.opentelemetry.proto diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsProtoBufDecoder.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsProtoBufDecoder.java index f8a369f6dd..30195b02da 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsProtoBufDecoder.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsProtoBufDecoder.java @@ -25,6 +25,7 @@ public class OTelLogsProtoBufDecoder implements ByteDecoder { private static final Logger LOG = LoggerFactory.getLogger(OTelLogsProtoBufDecoder.class); + private static final int MAX_REQUEST_LEN = (8 * 1024 * 1024); private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder; private final boolean lengthPrefixedEncoding; @@ -33,6 +34,14 @@ public OTelLogsProtoBufDecoder(boolean lengthPrefixedEncoding) { this.lengthPrefixedEncoding = lengthPrefixedEncoding; } + private void parseRequest(byte[] buffer, final Instant timeReceivedMs, Consumer> eventConsumer) throws IOException { + ExportLogsServiceRequest request = ExportLogsServiceRequest.parseFrom(buffer); + List logs = otelProtoDecoder.parseExportLogsServiceRequest(request, timeReceivedMs); + for (OpenTelemetryLog log: logs) { + eventConsumer.accept(new Record<>(log)); + } + } + public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer> eventConsumer) throws IOException { Reader reader = new InputStreamReader(inputStream); ExportLogsServiceRequest.Builder builder = ExportLogsServiceRequest.newBuilder(); @@ -41,16 +50,11 @@ public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer logs = otelProtoDecoder.parseExportLogsServiceRequest(request, timeReceivedMs); - for (OpenTelemetryLog log: logs) { - eventConsumer.accept(new Record<>(log)); - } - } catch (Exception e) { - LOG.warn("Failed to parse Log Request"); + if (inputStream.available() > MAX_REQUEST_LEN) { + throw new IOException("buffer length exceeds max allowed buffer length of "+ MAX_REQUEST_LEN); } + byte[] buffer = inputStream.readAllBytes(); + parseRequest(buffer, timeReceivedMs, eventConsumer); return; } // As per the implementation of File exporter in "proto" format at @@ -60,20 +64,15 @@ public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer MAX_REQUEST_LEN) { + throw new IOException("buffer length exceeds max allowed buffer length of "+ MAX_REQUEST_LEN); + } + byte[] buffer = new byte[len]; + if (inputStream.read(buffer, 0, len) != len) { LOG.warn("Failed to read {} bytes", len); continue; } - try { - ExportLogsServiceRequest request = ExportLogsServiceRequest.parseFrom(buf); - List logs = otelProtoDecoder.parseExportLogsServiceRequest(request, timeReceivedMs); - for (OpenTelemetryLog log: logs) { - eventConsumer.accept(new Record<>(log)); - } - } catch (Exception e) { - LOG.warn("Failed to parse Log Request"); - } + parseRequest(buffer, timeReceivedMs, eventConsumer); } } } diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsProtoBufDecoderTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsProtoBufDecoderTest.java index fb068011af..5dc4450d4d 100644 --- a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsProtoBufDecoderTest.java +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsProtoBufDecoderTest.java @@ -5,19 +5,37 @@ package org.opensearch.dataprepper.plugins.otel.codec; -import java.io.InputStream; -import java.time.Instant; -import java.util.Map; +import com.google.protobuf.util.JsonFormat; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.log.OpenTelemetryLog; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; +import io.opentelemetry.proto.logs.v1.LogRecord; +import io.opentelemetry.proto.logs.v1.ResourceLogs; +import io.opentelemetry.proto.logs.v1.ScopeLogs; + +import java.time.Instant; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; public class OTelLogsProtoBufDecoderTest { + private static final String TEST_REQUEST_JSON_LOGS_FILE = "test-request-multiple-logs.json"; + // This protobuf format file is generated using OTEL collector and s3 exporter. S3 object created is copied as file private static final String TEST_REQUEST_LOGS_FILE = "test-otel-log.protobuf"; + // This protobuf format file is generated using OTEL collector and file exporter and then sending multiple log events to the collector private static final String TEST_REQUEST_MULTI_LOGS_FILE = "test-otel-multi-log.protobuf"; - private int count; public OTelLogsProtoBufDecoder createObjectUnderTest(boolean lengthPrefixedEncoding) { return new OTelLogsProtoBufDecoder(lengthPrefixedEncoding); @@ -51,15 +69,76 @@ public void testParse() throws Exception { @Test public void testParseWithLengthPrefixedEncoding() throws Exception { InputStream inputStream = OTelLogsProtoBufDecoderTest.class.getClassLoader().getResourceAsStream(TEST_REQUEST_MULTI_LOGS_FILE); - count = 0; + List> parsedRecords = new ArrayList<>(); createObjectUnderTest(true).parse(inputStream, Instant.now(), (record) -> { - Event event = (Event)record.getData(); - int severityNumber = (count == 0) ? 50 : (count == 1) ? 42 : 43; - String time = (count < 2) ? "2025-01-26T20:07:20Z" : "2025-01-26T20:07:40Z"; - String spanId = (count < 2) ? "eee19b7ec3c1b174" : "fff19b7ec3c1b174"; - validateLog((OpenTelemetryLog)record.getData(), severityNumber, time, spanId); - count++; + parsedRecords.add(record); }); + assertThat(parsedRecords.size(), equalTo(3)); + validateLog((OpenTelemetryLog)parsedRecords.get(0).getData(), 50, "2025-01-26T20:07:20Z", "eee19b7ec3c1b174"); + validateLog((OpenTelemetryLog)parsedRecords.get(1).getData(), 42, "2025-01-26T20:07:20Z", "eee19b7ec3c1b174"); + validateLog((OpenTelemetryLog)parsedRecords.get(2).getData(), 43, "2025-01-26T20:07:40Z", "fff19b7ec3c1b174"); + } + + private void validateLogFromRequest(OpenTelemetryLog logRecord) { + assertThat(logRecord.getServiceName(), is("service")); + assertThat(logRecord.getTime(), is("2020-05-24T14:00:00Z")); + assertThat(logRecord.getObservedTime(), is("2020-05-24T14:00:02Z")); + assertThat(logRecord.getBody(), is("Log value")); + assertThat(logRecord.getDroppedAttributesCount(), is(3)); + assertThat(logRecord.getSchemaUrl(), is("schemaurl")); + assertThat(logRecord.getSeverityNumber(), is(5)); + assertThat(logRecord.getSeverityText(), is("Severity value")); + assertThat(logRecord.getTraceId(), is("ba1a1c23b4093b63")); + assertThat(logRecord.getSpanId(), is("2cc83ac90ebc469c")); + Map mergedAttributes = logRecord.getAttributes(); + assertThat(mergedAttributes.keySet().size(), is(2)); + assertThat(mergedAttributes.get("log.attributes.statement@params"), is("us-east-1")); + assertThat(mergedAttributes.get("resource.attributes.service@name"), is("service")); + } + + private ExportLogsServiceRequest buildExportLogsServiceRequestFromJsonFile(String requestJsonFileName) throws IOException { + final ExportLogsServiceRequest.Builder builder = ExportLogsServiceRequest.newBuilder(); + JsonFormat.parser().merge(getFileAsJsonString(requestJsonFileName), builder); + return builder.build(); + } + + private String getFileAsJsonString(String requestJsonFileName) throws IOException { + final StringBuilder jsonBuilder = new StringBuilder(); + try (final InputStream inputStream = Objects.requireNonNull( + OTelLogsProtoBufDecoderTest.class.getClassLoader().getResourceAsStream(requestJsonFileName))) { + final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + bufferedReader.lines().forEach(jsonBuilder::append); + } + return jsonBuilder.toString(); + } + + @Test + public void testParseWithDynamicRequest() throws Exception { + final ExportLogsServiceRequest exportLogsServiceRequest = buildExportLogsServiceRequestFromJsonFile(TEST_REQUEST_JSON_LOGS_FILE); + InputStream inputStream = new ByteArrayInputStream(exportLogsServiceRequest.toByteArray()); + createObjectUnderTest(false).parse(inputStream, Instant.now(), (record) -> { + validateLogFromRequest((OpenTelemetryLog)record.getData()); + }); + } + + @Test + public void testParseWithLargeDynamicRequest_ThrowsException() throws Exception { + // Create a request larger than 8MB + List records = new ArrayList<>(); + for (int i = 0; i < 4 * 1024 * 1024; i++) { + records.add(LogRecord.newBuilder().build()); + } + ExportLogsServiceRequest exportLogsServiceRequest = ExportLogsServiceRequest.newBuilder() + .addResourceLogs(ResourceLogs.newBuilder() + .addScopeLogs(ScopeLogs.newBuilder() + .addAllLogRecords(records) + .build())).build(); + + InputStream inputStream = new ByteArrayInputStream(exportLogsServiceRequest.toByteArray()); + assertThrows(IOException.class, () -> createObjectUnderTest(false).parse(inputStream, Instant.now(), (record) -> { + validateLogFromRequest((OpenTelemetryLog)record.getData()); + })); } + }