Skip to content

[TEST][NO-MERGE] Stress test named pipes v2 #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import java.nio.file.{Files, Path}

import protocbridge.{ProtocCodeGenerator, ExtraEnv}

import scala.util.Try

/** A PluginFrontend instance provides a platform-dependent way for protoc to
* communicate with a JVM based ProtocCodeGenerator.
*
Expand Down Expand Up @@ -47,13 +45,7 @@ object PluginFrontend {
gen: ProtocCodeGenerator,
request: Array[Byte]
): Array[Byte] = {
Try {
gen.run(request)
}.recover { case throwable =>
createCodeGeneratorResponseWithError(
throwable.toString + "\n" + getStackTrace(throwable)
)
}.get
gen.run(request)
}

def createCodeGeneratorResponseWithError(error: String): Array[Byte] = {
Expand Down Expand Up @@ -116,9 +108,17 @@ object PluginFrontend {
gen: ProtocCodeGenerator,
fsin: InputStream,
env: ExtraEnv
): Array[Byte] = {
): Array[Byte] = try {
val bytes = readInputStreamToByteArrayWithEnv(fsin, env)
runWithBytes(gen, bytes)
} catch {
// This covers all Throwable including OutOfMemoryError, StackOverflowError, etc.
// We need to make a best effort to return a response to protoc,
// otherwise protoc can hang indefinitely.
case throwable: Throwable =>
createCodeGeneratorResponseWithError(
throwable.toString + "\n" + getStackTrace(throwable)
)
}

def createTempFile(extension: String, content: String): Path = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ object PosixPluginFrontend extends PluginFrontend {
val response = PluginFrontend.runWithInputStream(plugin, fsin, env)
fsin.close()

// Note that the output pipe must be opened after the input pipe is consumed.
// Otherwise, there might be a deadlock that
// - The shell script is stuck writing to the input pipe (which has a full buffer),
// and doesn't open the write end of the output pipe.
// - This thread is stuck waiting for the write end of the output pipe to be opened.
val fsout = Files.newOutputStream(outputPipe)
fsout.write(response)
fsout.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package protocbridge.frontend

import org.apache.commons.io.IOUtils
import org.scalatest.exceptions.TestFailedException
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.must.Matchers
import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.io.ByteArrayOutputStream
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future, TimeoutException}
import scala.sys.process.ProcessIO
import scala.util.Random

class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers {

protected def testPluginFrontend(
frontend: PluginFrontend,
generator: ProtocCodeGenerator,
env: ExtraEnv,
request: Array[Byte]
): (frontend.InternalState, Array[Byte]) = {
val (path, state) = frontend.prepare(
generator,
env
)
val actualOutput = new ByteArrayOutputStream()
val process = sys.process
.Process(path.toAbsolutePath.toString)
.run(
new ProcessIO(
writeInput => {
writeInput.write(request)
writeInput.close()
},
processOutput => {
IOUtils.copy(processOutput, actualOutput)
processOutput.close()
},
processError => {
IOUtils.copy(processError, System.err)
processError.close()
}
)
)
try {
Await.result(Future { process.exitValue() }, 5.seconds)
} catch {
case _: TimeoutException =>
System.err.println(s"Timeout")
process.destroy()
}
frontend.cleanup(state)
(state, actualOutput.toByteArray)
}

protected def testSuccess(frontend: PluginFrontend): Unit = {
val random = new Random()
val toSend = Array.fill(123)(random.nextInt(256).toByte)
val toReceive = Array.fill(456)(random.nextInt(256).toByte)
val env = new ExtraEnv(secondaryOutputDir = "tmp")

val fakeGenerator = new ProtocCodeGenerator {
override def run(request: Array[Byte]): Array[Byte] = {
request mustBe (toSend ++ env.toByteArrayAsField)
toReceive
}
}
// Repeat 100,000 times since named pipes on macOS are flaky.
val repeatCount = 100000
for (i <- 1 until repeatCount) {
if (i % 100 == 1) println(s"Running iteration $i of $repeatCount")
val (state, response) =
testPluginFrontend(frontend, fakeGenerator, env, toSend)
try {
response mustBe toReceive
} catch {
case e: TestFailedException =>
System.err.println(
s"""Failed on iteration $i of $repeatCount: ${e.getMessage}"""
)
}
}
val (state, response) =
testPluginFrontend(frontend, fakeGenerator, env, toSend)
try {
response mustBe toReceive
} catch {
case e: TestFailedException =>
System.err.println(
s"""Failed on iteration $repeatCount of $repeatCount: ${e.getMessage}"""
)
}
state
}

protected def testFailure(frontend: PluginFrontend): Unit = {
val random = new Random()
val toSend = Array.fill(123)(random.nextInt(256).toByte)
val env = new ExtraEnv(secondaryOutputDir = "tmp")

val fakeGenerator = new ProtocCodeGenerator {
override def run(request: Array[Byte]): Array[Byte] = {
throw new OutOfMemoryError("test error")
}
}
val (_, response) =
testPluginFrontend(frontend, fakeGenerator, env, toSend)
response.length must be > 0
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package protocbridge.frontend

class PosixPluginFrontendSpec extends OsSpecificFrontendSpec {
if (!PluginFrontend.isWindows) {
it must "execute a program that forwards input and output to given stream" in {
testSuccess(PosixPluginFrontend)
}

it must "not hang if there is an OOM in generator" in {
testFailure(PosixPluginFrontend)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,38 +1,13 @@
package protocbridge.frontend

import java.io.ByteArrayInputStream

import protocbridge.{ProtocCodeGenerator, ExtraEnv}

import scala.sys.process.ProcessLogger
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.must.Matchers

class WindowsPluginFrontendSpec extends AnyFlatSpec with Matchers {
class WindowsPluginFrontendSpec extends OsSpecificFrontendSpec {
if (PluginFrontend.isWindows) {
it must "execute a program that forwards input and output to given stream" in {
val toSend = "ping"
val toReceive = "pong"
val env = new ExtraEnv(secondaryOutputDir = "tmp")
testSuccess(WindowsPluginFrontend)
}

val fakeGenerator = new ProtocCodeGenerator {
override def run(request: Array[Byte]): Array[Byte] = {
request mustBe (toSend.getBytes ++ env.toByteArrayAsField)
toReceive.getBytes
}
}
val (path, state) = WindowsPluginFrontend.prepare(
fakeGenerator,
env
)
val actualOutput = scala.collection.mutable.Buffer.empty[String]
val process = sys.process
.Process(path.toAbsolutePath.toString)
.#<(new ByteArrayInputStream(toSend.getBytes))
.run(ProcessLogger(o => actualOutput.append(o)))
process.exitValue()
actualOutput.mkString mustBe toReceive
WindowsPluginFrontend.cleanup(state)
it must "not hang if there is an OOM in generator" in {
testFailure(WindowsPluginFrontend)
}
}
}