From 7953a64f4da9b54a65884d677ee1c20d1985bfe4 Mon Sep 17 00:00:00 2001 From: Bell Le Date: Fri, 9 Aug 2024 10:00:32 -0700 Subject: [PATCH 1/4] Add PosixPluginFrontendSpec --- .../frontend/OsSpecificFrontendSpec.scala | 56 +++++++++++++++++++ .../frontend/PosixPluginFrontendSpec.scala | 9 +++ .../frontend/WindowsPluginFrontendSpec.scala | 33 +---------- 3 files changed, 67 insertions(+), 31 deletions(-) create mode 100644 bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala create mode 100644 bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala diff --git a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala new file mode 100644 index 0000000..5b158ac --- /dev/null +++ b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala @@ -0,0 +1,56 @@ +package protocbridge.frontend + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.must.Matchers +import protocbridge.{ExtraEnv, ProtocCodeGenerator} + +import java.io.ByteArrayOutputStream +import scala.sys.process.ProcessIO +import scala.util.Random + +class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { + + protected def testPluginFrontend(frontend: PluginFrontend): Array[Byte] = { + val random = new Random() + val toSend = Array.fill(123)(random.nextInt(256).toByte) + val toReceive = Array.fill(456)(random.nextInt(256).toByte) + val env = new ExtraEnv(secondaryOutputDir = "tmp") + + val fakeGenerator = new ProtocCodeGenerator { + override def run(request: Array[Byte]): Array[Byte] = { + request mustBe (toSend ++ env.toByteArrayAsField) + toReceive + } + } + val (path, state) = frontend.prepare( + fakeGenerator, + env + ) + val actualOutput = new ByteArrayOutputStream() + val process = sys.process + .Process(path.toAbsolutePath.toString) + .run( + new ProcessIO( + writeInput => { + writeInput.write(toSend) + writeInput.close() + }, + processOutput => { + val buffer = new Array[Byte](4096) + var bytesRead = 0 + while (bytesRead != -1) { + bytesRead = processOutput.read(buffer) + if (bytesRead != -1) { + actualOutput.write(buffer, 0, bytesRead) + } + } + processOutput.close() + }, + _.close() + ) + ) + process.exitValue() + frontend.cleanup(state) + actualOutput.toByteArray + } +} diff --git a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala new file mode 100644 index 0000000..0d6d76a --- /dev/null +++ b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala @@ -0,0 +1,9 @@ +package protocbridge.frontend + +class PosixPluginFrontendSpec extends OsSpecificFrontendSpec { + if (!PluginFrontend.isWindows) { + it must "execute a program that forwards input and output to given stream" in { + testPluginFrontend(PosixPluginFrontend) + } + } +} diff --git a/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala index 6385ad7..7936141 100644 --- a/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala @@ -1,38 +1,9 @@ package protocbridge.frontend -import java.io.ByteArrayInputStream - -import protocbridge.{ProtocCodeGenerator, ExtraEnv} - -import scala.sys.process.ProcessLogger -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.must.Matchers - -class WindowsPluginFrontendSpec extends AnyFlatSpec with Matchers { +class WindowsPluginFrontendSpec extends OsSpecificFrontendSpec { if (PluginFrontend.isWindows) { it must "execute a program that forwards input and output to given stream" in { - val toSend = "ping" - val toReceive = "pong" - val env = new ExtraEnv(secondaryOutputDir = "tmp") - - val fakeGenerator = new ProtocCodeGenerator { - override def run(request: Array[Byte]): Array[Byte] = { - request mustBe (toSend.getBytes ++ env.toByteArrayAsField) - toReceive.getBytes - } - } - val (path, state) = WindowsPluginFrontend.prepare( - fakeGenerator, - env - ) - val actualOutput = scala.collection.mutable.Buffer.empty[String] - val process = sys.process - .Process(path.toAbsolutePath.toString) - .#<(new ByteArrayInputStream(toSend.getBytes)) - .run(ProcessLogger(o => actualOutput.append(o))) - process.exitValue() - actualOutput.mkString mustBe toReceive - WindowsPluginFrontend.cleanup(state) + testPluginFrontend(WindowsPluginFrontend) } } } From c576aedae92e4231588d883dba3d49170696c6fa Mon Sep 17 00:00:00 2001 From: Bell Le Date: Fri, 16 Aug 2024 16:54:29 -0700 Subject: [PATCH 2/4] Handle all failures in Future --- .../frontend/PluginFrontend.scala | 20 +++---- .../frontend/PosixPluginFrontend.scala | 5 ++ .../frontend/OsSpecificFrontendSpec.scala | 52 ++++++++++++++----- .../frontend/PosixPluginFrontendSpec.scala | 6 ++- .../frontend/WindowsPluginFrontendSpec.scala | 6 ++- 5 files changed, 63 insertions(+), 26 deletions(-) diff --git a/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala index 7415f06..ec7d6ca 100644 --- a/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala @@ -5,8 +5,6 @@ import java.nio.file.{Files, Path} import protocbridge.{ProtocCodeGenerator, ExtraEnv} -import scala.util.Try - /** A PluginFrontend instance provides a platform-dependent way for protoc to * communicate with a JVM based ProtocCodeGenerator. * @@ -47,13 +45,7 @@ object PluginFrontend { gen: ProtocCodeGenerator, request: Array[Byte] ): Array[Byte] = { - Try { - gen.run(request) - }.recover { case throwable => - createCodeGeneratorResponseWithError( - throwable.toString + "\n" + getStackTrace(throwable) - ) - }.get + gen.run(request) } def createCodeGeneratorResponseWithError(error: String): Array[Byte] = { @@ -116,9 +108,17 @@ object PluginFrontend { gen: ProtocCodeGenerator, fsin: InputStream, env: ExtraEnv - ): Array[Byte] = { + ): Array[Byte] = try { val bytes = readInputStreamToByteArrayWithEnv(fsin, env) runWithBytes(gen, bytes) + } catch { + // This covers all Throwable including OutOfMemoryError, StackOverflowError, etc. + // We need to make a best effort to return a response to protoc, + // otherwise protoc can hang indefinitely. + case throwable: Throwable => + createCodeGeneratorResponseWithError( + throwable.toString + "\n" + getStackTrace(throwable) + ) } def createTempFile(extension: String, content: String): Path = { diff --git a/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala index 5f70120..ef57687 100644 --- a/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala @@ -40,6 +40,11 @@ object PosixPluginFrontend extends PluginFrontend { val response = PluginFrontend.runWithInputStream(plugin, fsin, env) fsin.close() + // Note that the output pipe must be opened after the input pipe is consumed. + // Otherwise, there might be a deadlock that + // - The shell script is stuck writing to the input pipe (which has a full buffer), + // and doesn't open the write end of the output pipe. + // - This thread is stuck waiting for the write end of the output pipe to be opened. val fsout = Files.newOutputStream(outputPipe) fsout.write(response) fsout.close() diff --git a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala index 5b158ac..1519857 100644 --- a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala @@ -10,20 +10,14 @@ import scala.util.Random class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { - protected def testPluginFrontend(frontend: PluginFrontend): Array[Byte] = { - val random = new Random() - val toSend = Array.fill(123)(random.nextInt(256).toByte) - val toReceive = Array.fill(456)(random.nextInt(256).toByte) - val env = new ExtraEnv(secondaryOutputDir = "tmp") - - val fakeGenerator = new ProtocCodeGenerator { - override def run(request: Array[Byte]): Array[Byte] = { - request mustBe (toSend ++ env.toByteArrayAsField) - toReceive - } - } + protected def testPluginFrontend( + frontend: PluginFrontend, + generator: ProtocCodeGenerator, + env: ExtraEnv, + request: Array[Byte] + ): Array[Byte] = { val (path, state) = frontend.prepare( - fakeGenerator, + generator, env ) val actualOutput = new ByteArrayOutputStream() @@ -32,7 +26,7 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { .run( new ProcessIO( writeInput => { - writeInput.write(toSend) + writeInput.write(request) writeInput.close() }, processOutput => { @@ -53,4 +47,34 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { frontend.cleanup(state) actualOutput.toByteArray } + + protected def testSuccess(frontend: PluginFrontend): Unit = { + val random = new Random() + val toSend = Array.fill(123)(random.nextInt(256).toByte) + val toReceive = Array.fill(456)(random.nextInt(256).toByte) + val env = new ExtraEnv(secondaryOutputDir = "tmp") + + val fakeGenerator = new ProtocCodeGenerator { + override def run(request: Array[Byte]): Array[Byte] = { + request mustBe (toSend ++ env.toByteArrayAsField) + toReceive + } + } + val response = testPluginFrontend(frontend, fakeGenerator, env, toSend) + response mustBe toReceive + } + + protected def testFailure(frontend: PluginFrontend): Unit = { + val random = new Random() + val toSend = Array.fill(123)(random.nextInt(256).toByte) + val env = new ExtraEnv(secondaryOutputDir = "tmp") + + val fakeGenerator = new ProtocCodeGenerator { + override def run(request: Array[Byte]): Array[Byte] = { + throw new OutOfMemoryError("test error") + } + } + val response = testPluginFrontend(frontend, fakeGenerator, env, toSend) + response.length must be > 0 + } } diff --git a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala index 0d6d76a..2a3481c 100644 --- a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala @@ -3,7 +3,11 @@ package protocbridge.frontend class PosixPluginFrontendSpec extends OsSpecificFrontendSpec { if (!PluginFrontend.isWindows) { it must "execute a program that forwards input and output to given stream" in { - testPluginFrontend(PosixPluginFrontend) + testSuccess(PosixPluginFrontend) + } + + it must "not hang if there is an OOM in generator" in { + testFailure(PosixPluginFrontend) } } } diff --git a/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala index 7936141..4bf39de 100644 --- a/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala @@ -3,7 +3,11 @@ package protocbridge.frontend class WindowsPluginFrontendSpec extends OsSpecificFrontendSpec { if (PluginFrontend.isWindows) { it must "execute a program that forwards input and output to given stream" in { - testPluginFrontend(WindowsPluginFrontend) + testSuccess(WindowsPluginFrontend) + } + + it must "not hang if there is an OOM in generator" in { + testFailure(WindowsPluginFrontend) } } } From 0643f2e0d23f5cbed7dc677d2fa270247e2eb43b Mon Sep 17 00:00:00 2001 From: Bell Le Date: Fri, 9 Aug 2024 10:00:38 -0700 Subject: [PATCH 3/4] Use IOUtils in PosixPluginFrontendSpec --- .../frontend/OsSpecificFrontendSpec.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala index 1519857..faf99f5 100644 --- a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala @@ -1,5 +1,6 @@ package protocbridge.frontend +import org.apache.commons.io.IOUtils import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.must.Matchers import protocbridge.{ExtraEnv, ProtocCodeGenerator} @@ -30,17 +31,13 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { writeInput.close() }, processOutput => { - val buffer = new Array[Byte](4096) - var bytesRead = 0 - while (bytesRead != -1) { - bytesRead = processOutput.read(buffer) - if (bytesRead != -1) { - actualOutput.write(buffer, 0, bytesRead) - } - } + IOUtils.copy(processOutput, actualOutput) processOutput.close() }, - _.close() + processError => { + IOUtils.copy(processError, System.err) + processError.close() + } ) ) process.exitValue() From feb7cf6cb36ec0dffee3b353eef04752ea53ee82 Mon Sep 17 00:00:00 2001 From: Bell Le Date: Fri, 9 Aug 2024 10:02:56 -0700 Subject: [PATCH 4/4] Stress test PosixPluginFrontendSpec --- .../frontend/OsSpecificFrontendSpec.scala | 47 ++++++++++++++++--- 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala index faf99f5..a65bfd2 100644 --- a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala @@ -1,11 +1,15 @@ package protocbridge.frontend import org.apache.commons.io.IOUtils +import org.scalatest.exceptions.TestFailedException import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.must.Matchers import protocbridge.{ExtraEnv, ProtocCodeGenerator} import java.io.ByteArrayOutputStream +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, Future, TimeoutException} import scala.sys.process.ProcessIO import scala.util.Random @@ -16,7 +20,7 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { generator: ProtocCodeGenerator, env: ExtraEnv, request: Array[Byte] - ): Array[Byte] = { + ): (frontend.InternalState, Array[Byte]) = { val (path, state) = frontend.prepare( generator, env @@ -40,9 +44,15 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { } ) ) - process.exitValue() + try { + Await.result(Future { process.exitValue() }, 5.seconds) + } catch { + case _: TimeoutException => + System.err.println(s"Timeout") + process.destroy() + } frontend.cleanup(state) - actualOutput.toByteArray + (state, actualOutput.toByteArray) } protected def testSuccess(frontend: PluginFrontend): Unit = { @@ -57,8 +67,32 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { toReceive } } - val response = testPluginFrontend(frontend, fakeGenerator, env, toSend) - response mustBe toReceive + // Repeat 100,000 times since named pipes on macOS are flaky. + val repeatCount = 100000 + for (i <- 1 until repeatCount) { + if (i % 100 == 1) println(s"Running iteration $i of $repeatCount") + val (state, response) = + testPluginFrontend(frontend, fakeGenerator, env, toSend) + try { + response mustBe toReceive + } catch { + case e: TestFailedException => + System.err.println( + s"""Failed on iteration $i of $repeatCount: ${e.getMessage}""" + ) + } + } + val (state, response) = + testPluginFrontend(frontend, fakeGenerator, env, toSend) + try { + response mustBe toReceive + } catch { + case e: TestFailedException => + System.err.println( + s"""Failed on iteration $repeatCount of $repeatCount: ${e.getMessage}""" + ) + } + state } protected def testFailure(frontend: PluginFrontend): Unit = { @@ -71,7 +105,8 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { throw new OutOfMemoryError("test error") } } - val response = testPluginFrontend(frontend, fakeGenerator, env, toSend) + val (_, response) = + testPluginFrontend(frontend, fakeGenerator, env, toSend) response.length must be > 0 } }