Skip to content

[TEST][NO-MERGE] Stress test file + signal #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: bell-db/v0.9.7-socket
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 95 additions & 9 deletions bridge/src/main/scala/protocbridge/frontend/MacPluginFrontend.scala
Original file line number Diff line number Diff line change
@@ -1,27 +1,77 @@
package protocbridge.frontend

import protocbridge.{ExtraEnv, ProtocCodeGenerator}
import sun.misc.{Signal, SignalHandler}

import java.lang.management.ManagementFactory
import java.nio.file.attribute.PosixFilePermission
import java.nio.file.{Files, Path}
import java.nio.{ByteBuffer, ByteOrder}
import java.{util => ju}
import scala.sys.process._

/** PluginFrontend for macOS.
*
* Creates a server socket and uses `nc` to communicate with the socket. We use
* a server socket instead of named pipes because named pipes are unreliable on
* macOS: https://github.com/scalapb/protoc-bridge/issues/366. Since `nc` is
* widely available on macOS, this is the simplest and most reliable solution
* for macOS.
* TODO
*/
object MacPluginFrontend extends SocketBasedPluginFrontend {
object MacPluginFrontend extends PluginFrontend {
case class InternalState(
inputFile: Path,
outputFile: Path,
tempDir: Path,
shellScript: Path
)

override def prepare(
plugin: ProtocCodeGenerator,
env: ExtraEnv
): (Path, InternalState) = {
val tempDirPath = Files.createTempDirectory("protopipe-")
val inputFile = tempDirPath.resolve("input")
val outputFile = tempDirPath.resolve("output")
val sh = createShellScript(getCurrentPid, inputFile, outputFile)
val internalState = InternalState(inputFile, outputFile, tempDirPath, sh)

Signal.handle(
new Signal("USR1"),
new SigUsr1Handler(internalState, plugin, env)
)

(sh, internalState)
}

override def cleanup(state: InternalState): Unit = {
if (sys.props.get("protocbridge.debug") != Some("1")) {
Files.delete(state.inputFile)
Files.delete(state.outputFile)
Files.delete(state.tempDir)
Files.delete(state.shellScript)
}
}

protected def createShellScript(port: Int): Path = {
private def createShellScript(
serverPid: Int,
inputPipe: Path,
outputPipe: Path
): Path = {
val shell = sys.env.getOrElse("PROTOCBRIDGE_SHELL", "/bin/sh")
// We use 127.0.0.1 instead of localhost for the (very unlikely) case that localhost is missing from /etc/hosts.
// Output PID as int32 big-endian.
// The current maximum PID on macOS is 99998 (3 bytes) but just in case it's bumped.
// Use `wait` background `sleep` instead of foreground `sleep`,
// so that signals are handled immediately instead of after `sleep` finishes.
// Renew `sleep` if `sleep` expires before the signal (the `wait` result is 0).
// Clean up `sleep` if `wait` exits due to the signal (the `wait` result is 128 + SIGUSR1 = 138).
val scriptName = PluginFrontend.createTempFile(
"",
s"""|#!$shell
|set -e
|nc 127.0.0.1 $port
|printf "%08x" $$$$ | xxd -r -p > "$inputPipe"
|cat /dev/stdin >> "$inputPipe"
|trap 'cat "$outputPipe"' USR1
|kill -USR1 "$serverPid"
|sleep 1 & SLEEP_PID=$$!
|while wait "$$SLEEP_PID"; do sleep 1 & SLEEP_PID=$$!; done
|kill $$SLEEP_PID 2>/dev/null || true
""".stripMargin
)
val perms = new ju.HashSet[PosixFilePermission]
Expand All @@ -33,4 +83,40 @@ object MacPluginFrontend extends SocketBasedPluginFrontend {
)
scriptName
}

private def getCurrentPid: Int = {
val jvmName = ManagementFactory.getRuntimeMXBean.getName
val pid = jvmName.split("@")(0)
pid.toInt
}

private class SigUsr1Handler(
internalState: InternalState,
plugin: ProtocCodeGenerator,
env: ExtraEnv
) extends SignalHandler {
override def handle(sig: Signal): Unit = {
val fsin = Files.newInputStream(internalState.inputFile)

val buffer = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN)
val shPid = if (fsin.read(buffer.array()) == 4) {
buffer.getInt(0)
} else {
fsin.close()
throw new RuntimeException(
s"The first 4 bytes in '${internalState.inputFile}' should be the PID of the shell script"
)
}

val response = PluginFrontend.runWithInputStream(plugin, fsin, env)
fsin.close()

val fsout = Files.newOutputStream(internalState.outputFile)
fsout.write(response)
fsout.close()

// Signal the shell script to read the output file.
s"kill -USR1 $shPid".!!
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package protocbridge.frontend
class MacPluginFrontendSpec extends OsSpecificFrontendSpec {
if (PluginFrontend.isMac) {
it must "execute a program that forwards input and output to given stream" in {
val state = testSuccess(MacPluginFrontend)
state.serverSocket.isClosed mustBe true
testSuccess(MacPluginFrontend)
}

it must "not hang if there is an error in generator" in {
val state = testFailure(MacPluginFrontend)
state.serverSocket.isClosed mustBe true
testFailure(MacPluginFrontend)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package protocbridge.frontend

import org.apache.commons.io.IOUtils
import org.scalatest.exceptions.TestFailedException
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.must.Matchers
import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.io.ByteArrayOutputStream
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future, TimeoutException}
import scala.sys.process.ProcessIO
import scala.util.Random

Expand All @@ -30,20 +35,22 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers {
writeInput.close()
},
processOutput => {
val buffer = new Array[Byte](4096)
var bytesRead = 0
while (bytesRead != -1) {
bytesRead = processOutput.read(buffer)
if (bytesRead != -1) {
actualOutput.write(buffer, 0, bytesRead)
}
}
IOUtils.copy(processOutput, actualOutput)
processOutput.close()
},
_.close()
processError => {
IOUtils.copy(processError, System.err)
processError.close()
}
)
)
process.exitValue()
try {
Await.result(Future { process.exitValue() }, 5.seconds)
} catch {
case _: TimeoutException =>
System.err.println(s"Timeout")
process.destroy()
}
frontend.cleanup(state)
(state, actualOutput.toByteArray)
}
Expand All @@ -62,9 +69,31 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers {
toReceive
}
}
// Repeat 100,000 times since named pipes on macOS are flaky.
val repeatCount = 1000
for (i <- 1 until repeatCount) {
if (i % 100 == 1) println(s"Running iteration $i of $repeatCount")
val (state, response) =
testPluginFrontend(frontend, fakeGenerator, env, toSend)
try {
response mustBe toReceive
} catch {
case e: TestFailedException =>
System.err.println(
s"""Failed on iteration $i of $repeatCount: ${e.getMessage}"""
)
}
}
val (state, response) =
testPluginFrontend(frontend, fakeGenerator, env, toSend)
response mustBe toReceive
try {
response mustBe toReceive
} catch {
case e: TestFailedException =>
System.err.println(
s"""Failed on iteration $repeatCount of $repeatCount: ${e.getMessage}"""
)
}
state
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package protocbridge.frontend
class PosixPluginFrontendSpec extends OsSpecificFrontendSpec {
if (!PluginFrontend.isWindows && !PluginFrontend.isMac) {
it must "execute a program that forwards input and output to given stream" in {
testSuccess(PosixPluginFrontend)
testSuccess(MacPluginFrontend)
}

it must "not hang if there is an OOM in generator" in {
testFailure(PosixPluginFrontend)
testFailure(MacPluginFrontend)
}
}
}