diff --git a/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala index 7415f06..ec7d6ca 100644 --- a/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala @@ -5,8 +5,6 @@ import java.nio.file.{Files, Path} import protocbridge.{ProtocCodeGenerator, ExtraEnv} -import scala.util.Try - /** A PluginFrontend instance provides a platform-dependent way for protoc to * communicate with a JVM based ProtocCodeGenerator. * @@ -47,13 +45,7 @@ object PluginFrontend { gen: ProtocCodeGenerator, request: Array[Byte] ): Array[Byte] = { - Try { - gen.run(request) - }.recover { case throwable => - createCodeGeneratorResponseWithError( - throwable.toString + "\n" + getStackTrace(throwable) - ) - }.get + gen.run(request) } def createCodeGeneratorResponseWithError(error: String): Array[Byte] = { @@ -116,9 +108,17 @@ object PluginFrontend { gen: ProtocCodeGenerator, fsin: InputStream, env: ExtraEnv - ): Array[Byte] = { + ): Array[Byte] = try { val bytes = readInputStreamToByteArrayWithEnv(fsin, env) runWithBytes(gen, bytes) + } catch { + // This covers all Throwable including OutOfMemoryError, StackOverflowError, etc. + // We need to make a best effort to return a response to protoc, + // otherwise protoc can hang indefinitely. + case throwable: Throwable => + createCodeGeneratorResponseWithError( + throwable.toString + "\n" + getStackTrace(throwable) + ) } def createTempFile(extension: String, content: String): Path = { diff --git a/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala index 5f70120..ef57687 100644 --- a/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala @@ -40,6 +40,11 @@ object PosixPluginFrontend extends PluginFrontend { val response = PluginFrontend.runWithInputStream(plugin, fsin, env) fsin.close() + // Note that the output pipe must be opened after the input pipe is consumed. + // Otherwise, there might be a deadlock that + // - The shell script is stuck writing to the input pipe (which has a full buffer), + // and doesn't open the write end of the output pipe. + // - This thread is stuck waiting for the write end of the output pipe to be opened. val fsout = Files.newOutputStream(outputPipe) fsout.write(response) fsout.close() diff --git a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala new file mode 100644 index 0000000..a65bfd2 --- /dev/null +++ b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala @@ -0,0 +1,112 @@ +package protocbridge.frontend + +import org.apache.commons.io.IOUtils +import org.scalatest.exceptions.TestFailedException +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.must.Matchers +import protocbridge.{ExtraEnv, ProtocCodeGenerator} + +import java.io.ByteArrayOutputStream +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, Future, TimeoutException} +import scala.sys.process.ProcessIO +import scala.util.Random + +class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { + + protected def testPluginFrontend( + frontend: PluginFrontend, + generator: ProtocCodeGenerator, + env: ExtraEnv, + request: Array[Byte] + ): (frontend.InternalState, Array[Byte]) = { + val (path, state) = frontend.prepare( + generator, + env + ) + val actualOutput = new ByteArrayOutputStream() + val process = sys.process + .Process(path.toAbsolutePath.toString) + .run( + new ProcessIO( + writeInput => { + writeInput.write(request) + writeInput.close() + }, + processOutput => { + IOUtils.copy(processOutput, actualOutput) + processOutput.close() + }, + processError => { + IOUtils.copy(processError, System.err) + processError.close() + } + ) + ) + try { + Await.result(Future { process.exitValue() }, 5.seconds) + } catch { + case _: TimeoutException => + System.err.println(s"Timeout") + process.destroy() + } + frontend.cleanup(state) + (state, actualOutput.toByteArray) + } + + protected def testSuccess(frontend: PluginFrontend): Unit = { + val random = new Random() + val toSend = Array.fill(123)(random.nextInt(256).toByte) + val toReceive = Array.fill(456)(random.nextInt(256).toByte) + val env = new ExtraEnv(secondaryOutputDir = "tmp") + + val fakeGenerator = new ProtocCodeGenerator { + override def run(request: Array[Byte]): Array[Byte] = { + request mustBe (toSend ++ env.toByteArrayAsField) + toReceive + } + } + // Repeat 100,000 times since named pipes on macOS are flaky. + val repeatCount = 100000 + for (i <- 1 until repeatCount) { + if (i % 100 == 1) println(s"Running iteration $i of $repeatCount") + val (state, response) = + testPluginFrontend(frontend, fakeGenerator, env, toSend) + try { + response mustBe toReceive + } catch { + case e: TestFailedException => + System.err.println( + s"""Failed on iteration $i of $repeatCount: ${e.getMessage}""" + ) + } + } + val (state, response) = + testPluginFrontend(frontend, fakeGenerator, env, toSend) + try { + response mustBe toReceive + } catch { + case e: TestFailedException => + System.err.println( + s"""Failed on iteration $repeatCount of $repeatCount: ${e.getMessage}""" + ) + } + state + } + + protected def testFailure(frontend: PluginFrontend): Unit = { + val random = new Random() + val toSend = Array.fill(123)(random.nextInt(256).toByte) + val env = new ExtraEnv(secondaryOutputDir = "tmp") + + val fakeGenerator = new ProtocCodeGenerator { + override def run(request: Array[Byte]): Array[Byte] = { + throw new OutOfMemoryError("test error") + } + } + val (_, response) = + testPluginFrontend(frontend, fakeGenerator, env, toSend) + response.length must be > 0 + } +} diff --git a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala new file mode 100644 index 0000000..2a3481c --- /dev/null +++ b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala @@ -0,0 +1,13 @@ +package protocbridge.frontend + +class PosixPluginFrontendSpec extends OsSpecificFrontendSpec { + if (!PluginFrontend.isWindows) { + it must "execute a program that forwards input and output to given stream" in { + testSuccess(PosixPluginFrontend) + } + + it must "not hang if there is an OOM in generator" in { + testFailure(PosixPluginFrontend) + } + } +} diff --git a/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala index 6385ad7..4bf39de 100644 --- a/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala @@ -1,38 +1,13 @@ package protocbridge.frontend -import java.io.ByteArrayInputStream - -import protocbridge.{ProtocCodeGenerator, ExtraEnv} - -import scala.sys.process.ProcessLogger -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.must.Matchers - -class WindowsPluginFrontendSpec extends AnyFlatSpec with Matchers { +class WindowsPluginFrontendSpec extends OsSpecificFrontendSpec { if (PluginFrontend.isWindows) { it must "execute a program that forwards input and output to given stream" in { - val toSend = "ping" - val toReceive = "pong" - val env = new ExtraEnv(secondaryOutputDir = "tmp") + testSuccess(WindowsPluginFrontend) + } - val fakeGenerator = new ProtocCodeGenerator { - override def run(request: Array[Byte]): Array[Byte] = { - request mustBe (toSend.getBytes ++ env.toByteArrayAsField) - toReceive.getBytes - } - } - val (path, state) = WindowsPluginFrontend.prepare( - fakeGenerator, - env - ) - val actualOutput = scala.collection.mutable.Buffer.empty[String] - val process = sys.process - .Process(path.toAbsolutePath.toString) - .#<(new ByteArrayInputStream(toSend.getBytes)) - .run(ProcessLogger(o => actualOutput.append(o))) - process.exitValue() - actualOutput.mkString mustBe toReceive - WindowsPluginFrontend.cleanup(state) + it must "not hang if there is an OOM in generator" in { + testFailure(WindowsPluginFrontend) } } }