Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Feb 7, 2025
1 parent 6d74574 commit a44f909
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 31 deletions.
4 changes: 4 additions & 0 deletions data-prepper-plugins/otel-proto-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ plugins {
id 'java'
}

test {
jvmArgs '-Xmx4g'
}

dependencies {
implementation project(':data-prepper-api')
implementation libs.opentelemetry.proto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

public class OTelLogsProtoBufDecoder implements ByteDecoder {
private static final Logger LOG = LoggerFactory.getLogger(OTelLogsProtoBufDecoder.class);
private static final int MAX_REQUEST_LEN = (8 * 1024 * 1024);
private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder;
private final boolean lengthPrefixedEncoding;

Expand All @@ -33,6 +34,14 @@ public OTelLogsProtoBufDecoder(boolean lengthPrefixedEncoding) {
this.lengthPrefixedEncoding = lengthPrefixedEncoding;
}

private void parseRequest(byte[] buffer, final Instant timeReceivedMs, Consumer<Record<Event>> eventConsumer) throws IOException {
ExportLogsServiceRequest request = ExportLogsServiceRequest.parseFrom(buffer);
List<OpenTelemetryLog> logs = otelProtoDecoder.parseExportLogsServiceRequest(request, timeReceivedMs);
for (OpenTelemetryLog log: logs) {
eventConsumer.accept(new Record<>(log));
}
}

public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer<Record<Event>> eventConsumer) throws IOException {
Reader reader = new InputStreamReader(inputStream);
ExportLogsServiceRequest.Builder builder = ExportLogsServiceRequest.newBuilder();
Expand All @@ -41,16 +50,11 @@ public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer<Reco
// Each request is written in a separate S3 object, so, no legth preceeding the actual data
// Same with Kafka exporter too. Each message is written as a separate message to Kafka
if (!lengthPrefixedEncoding) {
try {
byte[] buffer = inputStream.readAllBytes();
ExportLogsServiceRequest request = ExportLogsServiceRequest.parseFrom(buffer);
List<OpenTelemetryLog> logs = otelProtoDecoder.parseExportLogsServiceRequest(request, timeReceivedMs);
for (OpenTelemetryLog log: logs) {
eventConsumer.accept(new Record<>(log));
}
} catch (Exception e) {
LOG.warn("Failed to parse Log Request");
if (inputStream.available() > MAX_REQUEST_LEN) {
throw new IOException("buffer length exceeds max allowed buffer length of "+ MAX_REQUEST_LEN);
}
byte[] buffer = inputStream.readAllBytes();
parseRequest(buffer, timeReceivedMs, eventConsumer);
return;
}
// As per the implementation of File exporter in "proto" format at
Expand All @@ -60,20 +64,15 @@ public void parse(InputStream inputStream, Instant timeReceivedMs, Consumer<Reco
while (inputStream.read(lenBytes, 0, 4) == 4) {
ByteBuffer lengthBuffer = ByteBuffer.wrap(lenBytes);
int len = lengthBuffer.getInt();
byte[] buf = new byte[len];
if (inputStream.read(buf, 0, len) != len) {
if (len > MAX_REQUEST_LEN) {
throw new IOException("buffer length exceeds max allowed buffer length of "+ MAX_REQUEST_LEN);
}
byte[] buffer = new byte[len];
if (inputStream.read(buffer, 0, len) != len) {
LOG.warn("Failed to read {} bytes", len);
continue;
}
try {
ExportLogsServiceRequest request = ExportLogsServiceRequest.parseFrom(buf);
List<OpenTelemetryLog> logs = otelProtoDecoder.parseExportLogsServiceRequest(request, timeReceivedMs);
for (OpenTelemetryLog log: logs) {
eventConsumer.accept(new Record<>(log));
}
} catch (Exception e) {
LOG.warn("Failed to parse Log Request");
}
parseRequest(buffer, timeReceivedMs, eventConsumer);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,37 @@

package org.opensearch.dataprepper.plugins.otel.codec;

import java.io.InputStream;
import java.time.Instant;
import java.util.Map;
import com.google.protobuf.util.JsonFormat;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.log.OpenTelemetryLog;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.logs.v1.LogRecord;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.ScopeLogs;

import java.time.Instant;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class OTelLogsProtoBufDecoderTest {
private static final String TEST_REQUEST_JSON_LOGS_FILE = "test-request-multiple-logs.json";
// This protobuf format file is generated using OTEL collector and s3 exporter. S3 object created is copied as file
private static final String TEST_REQUEST_LOGS_FILE = "test-otel-log.protobuf";
// This protobuf format file is generated using OTEL collector and file exporter and then sending multiple log events to the collector
private static final String TEST_REQUEST_MULTI_LOGS_FILE = "test-otel-multi-log.protobuf";
private int count;

public OTelLogsProtoBufDecoder createObjectUnderTest(boolean lengthPrefixedEncoding) {
return new OTelLogsProtoBufDecoder(lengthPrefixedEncoding);
Expand Down Expand Up @@ -51,15 +69,76 @@ public void testParse() throws Exception {
@Test
public void testParseWithLengthPrefixedEncoding() throws Exception {
InputStream inputStream = OTelLogsProtoBufDecoderTest.class.getClassLoader().getResourceAsStream(TEST_REQUEST_MULTI_LOGS_FILE);
count = 0;
List<Record<Event>> parsedRecords = new ArrayList<>();
createObjectUnderTest(true).parse(inputStream, Instant.now(), (record) -> {
Event event = (Event)record.getData();
int severityNumber = (count == 0) ? 50 : (count == 1) ? 42 : 43;
String time = (count < 2) ? "2025-01-26T20:07:20Z" : "2025-01-26T20:07:40Z";
String spanId = (count < 2) ? "eee19b7ec3c1b174" : "fff19b7ec3c1b174";
validateLog((OpenTelemetryLog)record.getData(), severityNumber, time, spanId);
count++;
parsedRecords.add(record);
});
assertThat(parsedRecords.size(), equalTo(3));
validateLog((OpenTelemetryLog)parsedRecords.get(0).getData(), 50, "2025-01-26T20:07:20Z", "eee19b7ec3c1b174");
validateLog((OpenTelemetryLog)parsedRecords.get(1).getData(), 42, "2025-01-26T20:07:20Z", "eee19b7ec3c1b174");
validateLog((OpenTelemetryLog)parsedRecords.get(2).getData(), 43, "2025-01-26T20:07:40Z", "fff19b7ec3c1b174");
}

private void validateLogFromRequest(OpenTelemetryLog logRecord) {
assertThat(logRecord.getServiceName(), is("service"));
assertThat(logRecord.getTime(), is("2020-05-24T14:00:00Z"));
assertThat(logRecord.getObservedTime(), is("2020-05-24T14:00:02Z"));
assertThat(logRecord.getBody(), is("Log value"));
assertThat(logRecord.getDroppedAttributesCount(), is(3));
assertThat(logRecord.getSchemaUrl(), is("schemaurl"));
assertThat(logRecord.getSeverityNumber(), is(5));
assertThat(logRecord.getSeverityText(), is("Severity value"));
assertThat(logRecord.getTraceId(), is("ba1a1c23b4093b63"));
assertThat(logRecord.getSpanId(), is("2cc83ac90ebc469c"));
Map<String, Object> mergedAttributes = logRecord.getAttributes();
assertThat(mergedAttributes.keySet().size(), is(2));
assertThat(mergedAttributes.get("log.attributes.statement@params"), is("us-east-1"));
assertThat(mergedAttributes.get("resource.attributes.service@name"), is("service"));
}

private ExportLogsServiceRequest buildExportLogsServiceRequestFromJsonFile(String requestJsonFileName) throws IOException {
final ExportLogsServiceRequest.Builder builder = ExportLogsServiceRequest.newBuilder();
JsonFormat.parser().merge(getFileAsJsonString(requestJsonFileName), builder);
return builder.build();
}

private String getFileAsJsonString(String requestJsonFileName) throws IOException {
final StringBuilder jsonBuilder = new StringBuilder();
try (final InputStream inputStream = Objects.requireNonNull(
OTelLogsProtoBufDecoderTest.class.getClassLoader().getResourceAsStream(requestJsonFileName))) {
final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
bufferedReader.lines().forEach(jsonBuilder::append);
}
return jsonBuilder.toString();
}

@Test
public void testParseWithDynamicRequest() throws Exception {
final ExportLogsServiceRequest exportLogsServiceRequest = buildExportLogsServiceRequestFromJsonFile(TEST_REQUEST_JSON_LOGS_FILE);
InputStream inputStream = new ByteArrayInputStream(exportLogsServiceRequest.toByteArray());
createObjectUnderTest(false).parse(inputStream, Instant.now(), (record) -> {
validateLogFromRequest((OpenTelemetryLog)record.getData());
});
}

@Test
public void testParseWithLargeDynamicRequest_ThrowsException() throws Exception {

// Create a request larger than 8MB
List<LogRecord> records = new ArrayList<>();
for (int i = 0; i < 4 * 1024 * 1024; i++) {
records.add(LogRecord.newBuilder().build());
}
ExportLogsServiceRequest exportLogsServiceRequest = ExportLogsServiceRequest.newBuilder()
.addResourceLogs(ResourceLogs.newBuilder()
.addScopeLogs(ScopeLogs.newBuilder()
.addAllLogRecords(records)
.build())).build();

InputStream inputStream = new ByteArrayInputStream(exportLogsServiceRequest.toByteArray());
assertThrows(IOException.class, () -> createObjectUnderTest(false).parse(inputStream, Instant.now(), (record) -> {
validateLogFromRequest((OpenTelemetryLog)record.getData());
}));
}

}

0 comments on commit a44f909

Please sign in to comment.