From a298c600bc8210c05f671e76ade2e7f356793320 Mon Sep 17 00:00:00 2001
From: Andrea Marziali
Date: Wed, 16 Jul 2025 14:28:00 +0200
Subject: [PATCH 1/5] Make rum injector stream/writer more resilient to errors
---
.../buffer/InjectingPipeOutputStream.java | 75 ++++++++++------
.../buffer/InjectingPipeWriter.java | 85 ++++++++++++-------
.../InjectingPipeOutputStreamTest.groovy | 62 ++++++++++++++
.../buffer/InjectingPipeWriterTest.groovy | 62 ++++++++++++++
4 files changed, 231 insertions(+), 53 deletions(-)
diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java
index 633db0c8d4c..a7c7fe2dddf 100644
--- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java
+++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java
@@ -5,16 +5,19 @@
/**
* An OutputStream containing a circular buffer with a lookbehind buffer of n bytes. The first time
- * that the latest n bytes matches the marker, a content is injected before.
+ * that the latest n bytes matches the marker, a content is injected before. In case of IOException
+ * thrown by the downstream, the buffer will be lost unless the error occurred when draining it. In
+ * this case the draining will be resumed.
*/
public class InjectingPipeOutputStream extends OutputStream {
private final byte[] lookbehind;
private int pos;
- private boolean bufferFilled;
+ private int count;
private final byte[] marker;
private final byte[] contentToInject;
- private boolean found = false;
- private int matchingPos = 0;
+ private boolean filter;
+ private boolean wasDraining;
+ private int matchingPos;
private final Runnable onContentInjected;
private final int bulkWriteThreshold;
private final OutputStream downstream;
@@ -34,6 +37,11 @@ public InjectingPipeOutputStream(
this.marker = marker;
this.lookbehind = new byte[marker.length];
this.pos = 0;
+ this.count = 0;
+ this.matchingPos = 0;
+ this.wasDraining = false;
+ // should filter the stream to potentially inject into it.
+ this.filter = true;
this.contentToInject = contentToInject;
this.onContentInjected = onContentInjected;
this.bulkWriteThreshold = marker.length * 2 - 2;
@@ -41,25 +49,27 @@ public InjectingPipeOutputStream(
@Override
public void write(int b) throws IOException {
- if (found) {
+ if (!filter) {
+ if (wasDraining) {
+ // continue draining
+ drain();
+ }
downstream.write(b);
return;
}
- if (bufferFilled) {
+ if (count == lookbehind.length) {
downstream.write(lookbehind[pos]);
+ } else {
+ count++;
}
lookbehind[pos] = (byte) b;
pos = (pos + 1) % lookbehind.length;
- if (!bufferFilled) {
- bufferFilled = pos == 0;
- }
-
if (marker[matchingPos++] == b) {
if (matchingPos == marker.length) {
- found = true;
+ filter = false;
downstream.write(contentToInject);
if (onContentInjected != null) {
onContentInjected.run();
@@ -73,10 +83,15 @@ public void write(int b) throws IOException {
@Override
public void write(byte[] array, int off, int len) throws IOException {
- if (found) {
+ if (!filter) {
+ if (wasDraining) {
+ // needs drain
+ drain();
+ }
downstream.write(array, off, len);
return;
}
+
if (len > bulkWriteThreshold) {
// if the content is large enough, we can bulk write everything but the N trail and tail.
// This because the buffer can already contain some byte from a previous single write.
@@ -84,7 +99,7 @@ public void write(byte[] array, int off, int len) throws IOException {
int idx = arrayContains(array, off, len, marker);
if (idx >= 0) {
// we have a full match. just write everything
- found = true;
+ filter = false;
drain();
downstream.write(array, off, idx);
downstream.write(contentToInject);
@@ -99,7 +114,13 @@ public void write(byte[] array, int off, int len) throws IOException {
write(array[i]);
}
drain();
+ boolean tmpFilter = filter;
+
+ // will be reset if no errors after the following write
+ filter = false;
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
+ filter = tmpFilter;
+
for (int i = len - marker.length + 1; i < len; i++) {
write(array[i]);
}
@@ -133,16 +154,19 @@ private int arrayContains(byte[] array, int off, int len, byte[] search) {
}
private void drain() throws IOException {
- if (bufferFilled) {
- for (int i = 0; i < lookbehind.length; i++) {
- downstream.write(lookbehind[(pos + i) % lookbehind.length]);
+ if (count > 0) {
+ boolean tmpFilter = filter;
+ filter = false;
+ wasDraining = true;
+ int start = (pos - count + lookbehind.length) % lookbehind.length;
+ int cnt = count;
+ for (int i = 0; i < cnt; i++) {
+ downstream.write(lookbehind[(start + i) % lookbehind.length]);
+ count--;
}
- } else {
- downstream.write(this.lookbehind, 0, pos);
+ filter = tmpFilter;
+ wasDraining = false;
}
- pos = 0;
- matchingPos = 0;
- bufferFilled = false;
}
@Override
@@ -152,9 +176,12 @@ public void flush() throws IOException {
@Override
public void close() throws IOException {
- if (!found) {
- drain();
+ try {
+ if (filter || wasDraining) {
+ drain();
+ }
+ } finally {
+ downstream.close();
}
- downstream.close();
}
}
diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java
index f012c04cae4..aaf70a1c5c2 100644
--- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java
+++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java
@@ -5,16 +5,19 @@
/**
* A Writer containing a circular buffer with a lookbehind buffer of n bytes. The first time that
- * the latest n bytes matches the marker, a content is injected before.
+ * the latest n bytes matches the marker, a content is injected before. In case of IOException
+ * thrown by the downstream, the buffer will be lost unless the error occurred when draining it. In
+ * this case the draining will be resumed.
*/
public class InjectingPipeWriter extends Writer {
private final char[] lookbehind;
private int pos;
- private boolean bufferFilled;
+ private int count;
private final char[] marker;
private final char[] contentToInject;
- private boolean found = false;
- private int matchingPos = 0;
+ private boolean filter;
+ private boolean wasDraining;
+ private int matchingPos;
private final Runnable onContentInjected;
private final int bulkWriteThreshold;
private final Writer downstream;
@@ -34,6 +37,11 @@ public InjectingPipeWriter(
this.marker = marker;
this.lookbehind = new char[marker.length];
this.pos = 0;
+ this.count = 0;
+ this.matchingPos = 0;
+ this.wasDraining = false;
+ // should filter the stream to potentially inject into it.
+ this.filter = true;
this.contentToInject = contentToInject;
this.onContentInjected = onContentInjected;
this.bulkWriteThreshold = marker.length * 2 - 2;
@@ -41,25 +49,27 @@ public InjectingPipeWriter(
@Override
public void write(int c) throws IOException {
- if (found) {
+ if (!filter) {
+ if (wasDraining) {
+ // continue draining
+ drain();
+ }
downstream.write(c);
return;
}
- if (bufferFilled) {
+ if (count == lookbehind.length) {
downstream.write(lookbehind[pos]);
+ } else {
+ count++;
}
lookbehind[pos] = (char) c;
pos = (pos + 1) % lookbehind.length;
- if (!bufferFilled) {
- bufferFilled = pos == 0;
- }
-
if (marker[matchingPos++] == c) {
if (matchingPos == marker.length) {
- found = true;
+ filter = false;
downstream.write(contentToInject);
if (onContentInjected != null) {
onContentInjected.run();
@@ -71,17 +81,17 @@ public void write(int c) throws IOException {
}
}
- @Override
- public void flush() throws IOException {
- downstream.flush();
- }
-
@Override
public void write(char[] array, int off, int len) throws IOException {
- if (found) {
+ if (!filter) {
+ if (wasDraining) {
+ // needs drain
+ drain();
+ }
downstream.write(array, off, len);
return;
}
+
if (len > bulkWriteThreshold) {
// if the content is large enough, we can bulk write everything but the N trail and tail.
// This because the buffer can already contain some byte from a previous single write.
@@ -89,7 +99,7 @@ public void write(char[] array, int off, int len) throws IOException {
int idx = arrayContains(array, off, len, marker);
if (idx >= 0) {
// we have a full match. just write everything
- found = true;
+ filter = false;
drain();
downstream.write(array, off, idx);
downstream.write(contentToInject);
@@ -104,7 +114,13 @@ public void write(char[] array, int off, int len) throws IOException {
write(array[i]);
}
drain();
+ boolean tmpFilter = filter;
+
+ // will be reset if no errors after the following write
+ filter = false;
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
+ filter = tmpFilter;
+
for (int i = len - marker.length + 1; i < len; i++) {
write(array[i]);
}
@@ -138,23 +154,34 @@ private int arrayContains(char[] array, int off, int len, char[] search) {
}
private void drain() throws IOException {
- if (bufferFilled) {
- for (int i = 0; i < lookbehind.length; i++) {
- downstream.write(lookbehind[(pos + i) % lookbehind.length]);
+ if (count > 0) {
+ boolean tmpFilter = filter;
+ filter = false;
+ wasDraining = true;
+ int start = (pos - count + lookbehind.length) % lookbehind.length;
+ int cnt = count;
+ for (int i = 0; i < cnt; i++) {
+ downstream.write(lookbehind[(start + i) % lookbehind.length]);
+ count--;
}
- } else {
- downstream.write(this.lookbehind, 0, pos);
+ filter = tmpFilter;
+ wasDraining = false;
}
- pos = 0;
- matchingPos = 0;
- bufferFilled = false;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ downstream.flush();
}
@Override
public void close() throws IOException {
- if (!found) {
- drain();
+ try {
+ if (filter || wasDraining) {
+ drain();
+ }
+ } finally {
+ downstream.close();
}
- downstream.close();
}
}
diff --git a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy
index 457b26577ba..755d698ad15 100644
--- a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy
+++ b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy
@@ -1,8 +1,37 @@
package datadog.trace.bootstrap.instrumentation.buffer
import datadog.trace.test.util.DDSpecification
+import org.apache.commons.io.IOUtils
class InjectingPipeOutputStreamTest extends DDSpecification {
+ static class GlitchedOutputStream extends FilterOutputStream {
+ int glitchesPos
+ int count
+
+ GlitchedOutputStream(OutputStream out, int glitchesPos) {
+ super(out)
+ this.glitchesPos = glitchesPos
+ }
+
+ @Override
+ void write(byte[] b, int off, int len) throws IOException {
+ count += len
+ if (count >= glitchesPos) {
+ glitchesPos = Integer.MAX_VALUE
+ throw new IOException("Glitched after $count bytes")
+ }
+ out.write(b, off, len)
+ }
+
+ @Override
+ void write(int b) throws IOException {
+ if (++count == glitchesPos) {
+ throw new IOException("Glitched after $glitchesPos bytes")
+ }
+ out.write(b)
+ }
+ }
+
def 'should filter a buffer and inject if found #found'() {
setup:
def downstream = new ByteArrayOutputStream()
@@ -20,4 +49,37 @@ class InjectingPipeOutputStreamTest extends DDSpecification {
"" | "" | "" | false | "