diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java index 633db0c8d4c..a7c7fe2dddf 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java @@ -5,16 +5,19 @@ /** * An OutputStream containing a circular buffer with a lookbehind buffer of n bytes. The first time - * that the latest n bytes matches the marker, a content is injected before. + * that the latest n bytes matches the marker, a content is injected before. In case of IOException + * thrown by the downstream, the buffer will be lost unless the error occurred when draining it. In + * this case the draining will be resumed. */ public class InjectingPipeOutputStream extends OutputStream { private final byte[] lookbehind; private int pos; - private boolean bufferFilled; + private int count; private final byte[] marker; private final byte[] contentToInject; - private boolean found = false; - private int matchingPos = 0; + private boolean filter; + private boolean wasDraining; + private int matchingPos; private final Runnable onContentInjected; private final int bulkWriteThreshold; private final OutputStream downstream; @@ -34,6 +37,11 @@ public InjectingPipeOutputStream( this.marker = marker; this.lookbehind = new byte[marker.length]; this.pos = 0; + this.count = 0; + this.matchingPos = 0; + this.wasDraining = false; + // should filter the stream to potentially inject into it. + this.filter = true; this.contentToInject = contentToInject; this.onContentInjected = onContentInjected; this.bulkWriteThreshold = marker.length * 2 - 2; @@ -41,25 +49,27 @@ public InjectingPipeOutputStream( @Override public void write(int b) throws IOException { - if (found) { + if (!filter) { + if (wasDraining) { + // continue draining + drain(); + } downstream.write(b); return; } - if (bufferFilled) { + if (count == lookbehind.length) { downstream.write(lookbehind[pos]); + } else { + count++; } lookbehind[pos] = (byte) b; pos = (pos + 1) % lookbehind.length; - if (!bufferFilled) { - bufferFilled = pos == 0; - } - if (marker[matchingPos++] == b) { if (matchingPos == marker.length) { - found = true; + filter = false; downstream.write(contentToInject); if (onContentInjected != null) { onContentInjected.run(); @@ -73,10 +83,15 @@ public void write(int b) throws IOException { @Override public void write(byte[] array, int off, int len) throws IOException { - if (found) { + if (!filter) { + if (wasDraining) { + // needs drain + drain(); + } downstream.write(array, off, len); return; } + if (len > bulkWriteThreshold) { // if the content is large enough, we can bulk write everything but the N trail and tail. // This because the buffer can already contain some byte from a previous single write. @@ -84,7 +99,7 @@ public void write(byte[] array, int off, int len) throws IOException { int idx = arrayContains(array, off, len, marker); if (idx >= 0) { // we have a full match. just write everything - found = true; + filter = false; drain(); downstream.write(array, off, idx); downstream.write(contentToInject); @@ -99,7 +114,13 @@ public void write(byte[] array, int off, int len) throws IOException { write(array[i]); } drain(); + boolean tmpFilter = filter; + + // will be reset if no errors after the following write + filter = false; downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold); + filter = tmpFilter; + for (int i = len - marker.length + 1; i < len; i++) { write(array[i]); } @@ -133,16 +154,19 @@ private int arrayContains(byte[] array, int off, int len, byte[] search) { } private void drain() throws IOException { - if (bufferFilled) { - for (int i = 0; i < lookbehind.length; i++) { - downstream.write(lookbehind[(pos + i) % lookbehind.length]); + if (count > 0) { + boolean tmpFilter = filter; + filter = false; + wasDraining = true; + int start = (pos - count + lookbehind.length) % lookbehind.length; + int cnt = count; + for (int i = 0; i < cnt; i++) { + downstream.write(lookbehind[(start + i) % lookbehind.length]); + count--; } - } else { - downstream.write(this.lookbehind, 0, pos); + filter = tmpFilter; + wasDraining = false; } - pos = 0; - matchingPos = 0; - bufferFilled = false; } @Override @@ -152,9 +176,12 @@ public void flush() throws IOException { @Override public void close() throws IOException { - if (!found) { - drain(); + try { + if (filter || wasDraining) { + drain(); + } + } finally { + downstream.close(); } - downstream.close(); } } diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java index f012c04cae4..aaf70a1c5c2 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java @@ -5,16 +5,19 @@ /** * A Writer containing a circular buffer with a lookbehind buffer of n bytes. The first time that - * the latest n bytes matches the marker, a content is injected before. + * the latest n bytes matches the marker, a content is injected before. In case of IOException + * thrown by the downstream, the buffer will be lost unless the error occurred when draining it. In + * this case the draining will be resumed. */ public class InjectingPipeWriter extends Writer { private final char[] lookbehind; private int pos; - private boolean bufferFilled; + private int count; private final char[] marker; private final char[] contentToInject; - private boolean found = false; - private int matchingPos = 0; + private boolean filter; + private boolean wasDraining; + private int matchingPos; private final Runnable onContentInjected; private final int bulkWriteThreshold; private final Writer downstream; @@ -34,6 +37,11 @@ public InjectingPipeWriter( this.marker = marker; this.lookbehind = new char[marker.length]; this.pos = 0; + this.count = 0; + this.matchingPos = 0; + this.wasDraining = false; + // should filter the stream to potentially inject into it. + this.filter = true; this.contentToInject = contentToInject; this.onContentInjected = onContentInjected; this.bulkWriteThreshold = marker.length * 2 - 2; @@ -41,25 +49,27 @@ public InjectingPipeWriter( @Override public void write(int c) throws IOException { - if (found) { + if (!filter) { + if (wasDraining) { + // continue draining + drain(); + } downstream.write(c); return; } - if (bufferFilled) { + if (count == lookbehind.length) { downstream.write(lookbehind[pos]); + } else { + count++; } lookbehind[pos] = (char) c; pos = (pos + 1) % lookbehind.length; - if (!bufferFilled) { - bufferFilled = pos == 0; - } - if (marker[matchingPos++] == c) { if (matchingPos == marker.length) { - found = true; + filter = false; downstream.write(contentToInject); if (onContentInjected != null) { onContentInjected.run(); @@ -71,17 +81,17 @@ public void write(int c) throws IOException { } } - @Override - public void flush() throws IOException { - downstream.flush(); - } - @Override public void write(char[] array, int off, int len) throws IOException { - if (found) { + if (!filter) { + if (wasDraining) { + // needs drain + drain(); + } downstream.write(array, off, len); return; } + if (len > bulkWriteThreshold) { // if the content is large enough, we can bulk write everything but the N trail and tail. // This because the buffer can already contain some byte from a previous single write. @@ -89,7 +99,7 @@ public void write(char[] array, int off, int len) throws IOException { int idx = arrayContains(array, off, len, marker); if (idx >= 0) { // we have a full match. just write everything - found = true; + filter = false; drain(); downstream.write(array, off, idx); downstream.write(contentToInject); @@ -104,7 +114,13 @@ public void write(char[] array, int off, int len) throws IOException { write(array[i]); } drain(); + boolean tmpFilter = filter; + + // will be reset if no errors after the following write + filter = false; downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold); + filter = tmpFilter; + for (int i = len - marker.length + 1; i < len; i++) { write(array[i]); } @@ -138,23 +154,34 @@ private int arrayContains(char[] array, int off, int len, char[] search) { } private void drain() throws IOException { - if (bufferFilled) { - for (int i = 0; i < lookbehind.length; i++) { - downstream.write(lookbehind[(pos + i) % lookbehind.length]); + if (count > 0) { + boolean tmpFilter = filter; + filter = false; + wasDraining = true; + int start = (pos - count + lookbehind.length) % lookbehind.length; + int cnt = count; + for (int i = 0; i < cnt; i++) { + downstream.write(lookbehind[(start + i) % lookbehind.length]); + count--; } - } else { - downstream.write(this.lookbehind, 0, pos); + filter = tmpFilter; + wasDraining = false; } - pos = 0; - matchingPos = 0; - bufferFilled = false; + } + + @Override + public void flush() throws IOException { + downstream.flush(); } @Override public void close() throws IOException { - if (!found) { - drain(); + try { + if (filter || wasDraining) { + drain(); + } + } finally { + downstream.close(); } - downstream.close(); } } diff --git a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy index 457b26577ba..9b04234ad3d 100644 --- a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy +++ b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy @@ -3,6 +3,36 @@ package datadog.trace.bootstrap.instrumentation.buffer import datadog.trace.test.util.DDSpecification class InjectingPipeOutputStreamTest extends DDSpecification { + static class GlitchedOutputStream extends FilterOutputStream { + int glitchesPos + int count + final OutputStream out + + GlitchedOutputStream(OutputStream out, int glitchesPos) { + super(out) + this.out = out + this.glitchesPos = glitchesPos + } + + @Override + void write(byte[] b, int off, int len) throws IOException { + count += len + if (count >= glitchesPos) { + glitchesPos = Integer.MAX_VALUE + throw new IOException("Glitched after $count bytes") + } + out.write(b, off, len) + } + + @Override + void write(int b) throws IOException { + if (++count == glitchesPos) { + throw new IOException("Glitched after $glitchesPos bytes") + } + out.write(b) + } + } + def 'should filter a buffer and inject if found #found'() { setup: def downstream = new ByteArrayOutputStream() @@ -20,4 +50,41 @@ class InjectingPipeOutputStreamTest extends DDSpecification { "" | "" | "" | false | "" "" | "" | "" | false | "" } + + def 'should be resilient to exceptions when writing #body'() { + setup: + def baos = new ByteArrayOutputStream() + def downstream = new GlitchedOutputStream(baos, glichesAt) + def piped = new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null) + when: + try { + for (String line : body) { + final bytes = line.getBytes("UTF-8") + try { + piped.write(bytes) + } catch (IOException ioe) { + ioe.printStackTrace() + piped.write(bytes) + } + } + } finally { + // it can throw when draining at close + try { + piped.close() + } catch (IOException ignored) { + } + } + then: + assert baos.toByteArray() == expected.getBytes("UTF-8") + where: + body | marker | contentToInject | glichesAt | expected + // write fails after the content has been injected + ["", "", "", "", "", ""] | "" | "" | 60 | "" + // write fails before the content has been injected + ["", "", "", "", "", ""] | "" | "" | 20 | "" + // write fails after having filled the buffer. The last line is written twice + ["", "", ""] | "" | "" | 10 | "" + // expected broken since the real write happens at close (drain) being the content smaller than the buffer. And retry on close is not a common practice. Hence, we suppose loosing content + [""] | "" | "" | 3 | "= glitchesPos) { + glitchesPos = Integer.MAX_VALUE + throw new IOException("Glitched after $count bytes") + } + out.write(c, off, len) + } + + @Override + void write(int c) throws IOException { + if (++count == glitchesPos) { + throw new IOException("Glitched after $glitchesPos bytes") + } + out.write(c) + } + } + def 'should filter a buffer and inject if found #found using write'() { setup: def downstream = new StringWriter() @@ -36,4 +66,41 @@ class InjectingPipeWriterTest extends DDSpecification { "" | "" | "" | false | "" "" | "" | "" | false | "" } + + def 'should be resilient to exceptions when writing #body'() { + setup: + def writer = new StringWriter() + def downstream = new GlitchedWriter(writer, glichesAt) + def piped = new InjectingPipeWriter(downstream, marker.toCharArray(), contentToInject.toCharArray(), null) + when: + try { + for (String line : body) { + final chars = line.toCharArray() + try { + piped.write(chars) + } catch (IOException ioe) { + ioe.printStackTrace() + piped.write(chars) + } + } + } finally { + // it can throw when draining at close + try { + piped.close() + } catch (IOException ignored) { + } + } + then: + assert writer.toString() == expected + where: + body | marker | contentToInject | glichesAt | expected + // write fails after the content has been injected + ["", "", "", "", "", ""] | "" | "" | 60 | "" + // write fails before the content has been injected + ["", "", "", "", "", ""] | "" | "" | 20 | "" + // write fails after having filled the buffer. The last line is written twice + ["", "", ""] | "" | "" | 10 | "" + // expected broken since the real write happens at close (drain) being the content smaller than the buffer. And retry on close is not a common practice. Hence, we suppose loosing content + [""] | "" | "" | 3 | "