Skip to content

Make rum injector stream/writer more resilient to errors #9184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@

/**
* An OutputStream containing a circular buffer with a lookbehind buffer of n bytes. The first time
* that the latest n bytes matches the marker, a content is injected before.
* that the latest n bytes matches the marker, a content is injected before. In case of IOException
* thrown by the downstream, the buffer will be lost unless the error occurred when draining it. In
* this case the draining will be resumed.
*/
public class InjectingPipeOutputStream extends OutputStream {
private final byte[] lookbehind;
private int pos;
private boolean bufferFilled;
private int count;
private final byte[] marker;
private final byte[] contentToInject;
private boolean found = false;
private int matchingPos = 0;
private boolean filter;
private boolean wasDraining;
private int matchingPos;
private final Runnable onContentInjected;
private final int bulkWriteThreshold;
private final OutputStream downstream;
Expand All @@ -34,32 +37,39 @@ public InjectingPipeOutputStream(
this.marker = marker;
this.lookbehind = new byte[marker.length];
this.pos = 0;
this.count = 0;
this.matchingPos = 0;
this.wasDraining = false;
// should filter the stream to potentially inject into it.
this.filter = true;
this.contentToInject = contentToInject;
this.onContentInjected = onContentInjected;
this.bulkWriteThreshold = marker.length * 2 - 2;
}

@Override
public void write(int b) throws IOException {
if (found) {
if (!filter) {
if (wasDraining) {
// continue draining
drain();
}
downstream.write(b);
return;
}

if (bufferFilled) {
if (count == lookbehind.length) {
downstream.write(lookbehind[pos]);
} else {
count++;
}

lookbehind[pos] = (byte) b;
pos = (pos + 1) % lookbehind.length;

if (!bufferFilled) {
bufferFilled = pos == 0;
}

if (marker[matchingPos++] == b) {
if (matchingPos == marker.length) {
found = true;
filter = false;
downstream.write(contentToInject);
if (onContentInjected != null) {
onContentInjected.run();
Expand All @@ -73,18 +83,23 @@ public void write(int b) throws IOException {

@Override
public void write(byte[] array, int off, int len) throws IOException {
if (found) {
if (!filter) {
if (wasDraining) {
// needs drain
drain();
}
downstream.write(array, off, len);
return;
}

if (len > bulkWriteThreshold) {
// if the content is large enough, we can bulk write everything but the N trail and tail.
// This because the buffer can already contain some byte from a previous single write.
// Also we need to fill the buffer with the tail since we don't know about the next write.
int idx = arrayContains(array, off, len, marker);
if (idx >= 0) {
// we have a full match. just write everything
found = true;
filter = false;
drain();
downstream.write(array, off, idx);
downstream.write(contentToInject);
Expand All @@ -99,7 +114,13 @@ public void write(byte[] array, int off, int len) throws IOException {
write(array[i]);
}
drain();
boolean tmpFilter = filter;

// will be reset if no errors after the following write
filter = false;
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
filter = tmpFilter;

Comment on lines +117 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
boolean tmpFilter = filter;
// will be reset if no errors after the following write
filter = false;
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
filter = tmpFilter;
boolean wasFiltering = filter;
// will be reset if no errors after the following write
filter = false;
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
filter = wasFiltering;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also suggest replacing the other occurrences of tmpFilter with wasFiltering

for (int i = len - marker.length + 1; i < len; i++) {
write(array[i]);
}
Expand Down Expand Up @@ -133,16 +154,19 @@ private int arrayContains(byte[] array, int off, int len, byte[] search) {
}

private void drain() throws IOException {
if (bufferFilled) {
for (int i = 0; i < lookbehind.length; i++) {
downstream.write(lookbehind[(pos + i) % lookbehind.length]);
if (count > 0) {
boolean tmpFilter = filter;
filter = false;
wasDraining = true;
int start = (pos - count + lookbehind.length) % lookbehind.length;
int cnt = count;
for (int i = 0; i < cnt; i++) {
downstream.write(lookbehind[(start + i) % lookbehind.length]);
count--;
}
} else {
downstream.write(this.lookbehind, 0, pos);
filter = tmpFilter;
wasDraining = false;
}
pos = 0;
matchingPos = 0;
bufferFilled = false;
}

@Override
Expand All @@ -152,9 +176,12 @@ public void flush() throws IOException {

@Override
public void close() throws IOException {
if (!found) {
drain();
try {
if (filter || wasDraining) {
drain();
}
} finally {
downstream.close();
}
downstream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@

/**
* A Writer containing a circular buffer with a lookbehind buffer of n bytes. The first time that
* the latest n bytes matches the marker, a content is injected before.
* the latest n bytes matches the marker, a content is injected before. In case of IOException
* thrown by the downstream, the buffer will be lost unless the error occurred when draining it. In
* this case the draining will be resumed.
*/
public class InjectingPipeWriter extends Writer {
private final char[] lookbehind;
private int pos;
private boolean bufferFilled;
private int count;
private final char[] marker;
private final char[] contentToInject;
private boolean found = false;
private int matchingPos = 0;
private boolean filter;
private boolean wasDraining;
private int matchingPos;
private final Runnable onContentInjected;
private final int bulkWriteThreshold;
private final Writer downstream;
Expand All @@ -34,32 +37,39 @@ public InjectingPipeWriter(
this.marker = marker;
this.lookbehind = new char[marker.length];
this.pos = 0;
this.count = 0;
this.matchingPos = 0;
this.wasDraining = false;
// should filter the stream to potentially inject into it.
this.filter = true;
this.contentToInject = contentToInject;
this.onContentInjected = onContentInjected;
this.bulkWriteThreshold = marker.length * 2 - 2;
}

@Override
public void write(int c) throws IOException {
if (found) {
if (!filter) {
if (wasDraining) {
// continue draining
drain();
}
downstream.write(c);
return;
}

if (bufferFilled) {
if (count == lookbehind.length) {
downstream.write(lookbehind[pos]);
} else {
count++;
}

lookbehind[pos] = (char) c;
pos = (pos + 1) % lookbehind.length;

if (!bufferFilled) {
bufferFilled = pos == 0;
}

if (marker[matchingPos++] == c) {
if (matchingPos == marker.length) {
found = true;
filter = false;
downstream.write(contentToInject);
if (onContentInjected != null) {
onContentInjected.run();
Expand All @@ -71,25 +81,25 @@ public void write(int c) throws IOException {
}
}

@Override
public void flush() throws IOException {
downstream.flush();
}

@Override
public void write(char[] array, int off, int len) throws IOException {
if (found) {
if (!filter) {
if (wasDraining) {
// needs drain
drain();
}
downstream.write(array, off, len);
return;
}

if (len > bulkWriteThreshold) {
// if the content is large enough, we can bulk write everything but the N trail and tail.
// This because the buffer can already contain some byte from a previous single write.
// Also we need to fill the buffer with the tail since we don't know about the next write.
int idx = arrayContains(array, off, len, marker);
if (idx >= 0) {
// we have a full match. just write everything
found = true;
filter = false;
drain();
downstream.write(array, off, idx);
downstream.write(contentToInject);
Expand All @@ -104,7 +114,13 @@ public void write(char[] array, int off, int len) throws IOException {
write(array[i]);
}
drain();
boolean tmpFilter = filter;

// will be reset if no errors after the following write
filter = false;
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
filter = tmpFilter;

for (int i = len - marker.length + 1; i < len; i++) {
write(array[i]);
}
Expand Down Expand Up @@ -138,23 +154,34 @@ private int arrayContains(char[] array, int off, int len, char[] search) {
}

private void drain() throws IOException {
if (bufferFilled) {
for (int i = 0; i < lookbehind.length; i++) {
downstream.write(lookbehind[(pos + i) % lookbehind.length]);
if (count > 0) {
boolean tmpFilter = filter;
filter = false;
wasDraining = true;
int start = (pos - count + lookbehind.length) % lookbehind.length;
int cnt = count;
for (int i = 0; i < cnt; i++) {
downstream.write(lookbehind[(start + i) % lookbehind.length]);
count--;
}
} else {
downstream.write(this.lookbehind, 0, pos);
filter = tmpFilter;
wasDraining = false;
}
pos = 0;
matchingPos = 0;
bufferFilled = false;
}

@Override
public void flush() throws IOException {
downstream.flush();
}

@Override
public void close() throws IOException {
if (!found) {
drain();
try {
if (filter || wasDraining) {
drain();
}
} finally {
downstream.close();
}
downstream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,36 @@ package datadog.trace.bootstrap.instrumentation.buffer
import datadog.trace.test.util.DDSpecification

class InjectingPipeOutputStreamTest extends DDSpecification {
static class GlitchedOutputStream extends FilterOutputStream {
int glitchesPos
int count
final OutputStream out

GlitchedOutputStream(OutputStream out, int glitchesPos) {
super(out)
this.out = out
this.glitchesPos = glitchesPos
}

@Override
void write(byte[] b, int off, int len) throws IOException {
count += len
if (count >= glitchesPos) {
glitchesPos = Integer.MAX_VALUE
throw new IOException("Glitched after $count bytes")
}
out.write(b, off, len)
}

@Override
void write(int b) throws IOException {
if (++count == glitchesPos) {
throw new IOException("Glitched after $glitchesPos bytes")
}
out.write(b)
}
}

def 'should filter a buffer and inject if found #found'() {
setup:
def downstream = new ByteArrayOutputStream()
Expand All @@ -20,4 +50,41 @@ class InjectingPipeOutputStreamTest extends DDSpecification {
"<html><body/></html>" | "</head>" | "<something/>" | false | "<html><body/></html>"
"<foo/>" | "<longerThanFoo>" | "<nothing>" | false | "<foo/>"
}

def 'should be resilient to exceptions when writing #body'() {
setup:
def baos = new ByteArrayOutputStream()
def downstream = new GlitchedOutputStream(baos, glichesAt)
def piped = new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null)
when:
try {
for (String line : body) {
final bytes = line.getBytes("UTF-8")
try {
piped.write(bytes)
} catch (IOException ioe) {
ioe.printStackTrace()
piped.write(bytes)
}
}
} finally {
// it can throw when draining at close
try {
piped.close()
} catch (IOException ignored) {
}
}
then:
assert baos.toByteArray() == expected.getBytes("UTF-8")
where:
body | marker | contentToInject | glichesAt | expected
// write fails after the content has been injected
["<html>", "<head>", "<foo/>", "</head>", "<body/>", "</html>"] | "</head>" | "<script>true</script>" | 60 | "<html><head><foo/><script>true</script></head><body/></html>"
// write fails before the content has been injected
["<html>", "<head>", "<foo/>", "</head>", "<body/>", "</html>"] | "</head>" | "<script>true</script>" | 20 | "<html><head><foo/></head><body/></html>"
// write fails after having filled the buffer. The last line is written twice
["<html>", "<body/>", "</html>"] | "</head>" | "<something/>" | 10 | "<html><body/></h</html>"
// expected broken since the real write happens at close (drain) being the content smaller than the buffer. And retry on close is not a common practice. Hence, we suppose loosing content
["<foo/>"] | "<longerThanFoo>" | "<nothing>" | 3 | "<f"
}
}
Loading