Skip to content

[TEST][NO-MERGE] Stress test domain sockets #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: bell-db/v0.9.7-socket
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package protocbridge.frontend

import org.newsclub.net.unix.AFUNIXServerSocket
import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.net.ServerSocket
import java.nio.file.attribute.PosixFilePermission
import java.nio.file.{Files, Path}
import java.{util => ju}
Expand All @@ -8,20 +12,52 @@ import java.{util => ju}
*
* Creates a server socket and uses `nc` to communicate with the socket. We use
* a server socket instead of named pipes because named pipes are unreliable on
* macOS: https://github.com/scalapb/protoc-bridge/issues/366. Since `nc` is
* widely available on macOS, this is the simplest and most reliable solution
* for macOS.
* macOS: https://github.com/scalapb/protoc-bridge/issues/366
*
* Since `nc` is widely available on macOS, this is the simplest alternative
* for macOS. However, raw `nc` is also not very reliable on macOS:
* https://github.com/scalapb/protoc-bridge/issues/379
*
* The most reliable way to communicate is found to be with a domain socket and
* a server-side read timeout, which are implemented here.
*/
object MacPluginFrontend extends SocketBasedPluginFrontend {
case class InternalState(
shellScript: Path,
tempDirPath: Path,
socketPath: Path,
serverSocket: ServerSocket
)

override def prepare(
plugin: ProtocCodeGenerator,
env: ExtraEnv
): (Path, InternalState) = {
val tempDirPath = Files.createTempDirectory("protocbridge")
val socketPath = tempDirPath.resolve("socket")
val serverSocket = AFUNIXServerSocket.bindOn(socketPath, true)
val sh = createShellScript(socketPath)

runWithSocket(plugin, env, serverSocket)

(sh, InternalState(sh, tempDirPath, socketPath, serverSocket))
}

override def cleanup(state: InternalState): Unit = {
state.serverSocket.close()
if (sys.props.get("protocbridge.debug") != Some("1")) {
Files.delete(state.tempDirPath)
Files.delete(state.shellScript)
}
}

protected def createShellScript(port: Int): Path = {
private def createShellScript(socketPath: Path): Path = {
val shell = sys.env.getOrElse("PROTOCBRIDGE_SHELL", "/bin/sh")
// We use 127.0.0.1 instead of localhost for the (very unlikely) case that localhost is missing from /etc/hosts.
val scriptName = PluginFrontend.createTempFile(
"",
s"""|#!$shell
|set -e
|nc 127.0.0.1 $port
|nc -U "$socketPath"
""".stripMargin
)
val perms = new ju.HashSet[PosixFilePermission]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,30 @@ package protocbridge.frontend
import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.net.ServerSocket
import java.nio.file.{Files, Path}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, blocking}

/** PluginFrontend for Windows and macOS where a server socket is used.
*/
abstract class SocketBasedPluginFrontend extends PluginFrontend {
case class InternalState(serverSocket: ServerSocket, shellScript: Path)

override def prepare(
protected def runWithSocket(
plugin: ProtocCodeGenerator,
env: ExtraEnv
): (Path, InternalState) = {
val ss = new ServerSocket(0) // Bind to any available port.
val sh = createShellScript(ss.getLocalPort)

env: ExtraEnv,
serverSocket: ServerSocket
): Unit = {
Future {
blocking {
// Accept a single client connection from the shell script.
val client = ss.accept()
val client = serverSocket.accept()
// It's found on macOS that a `junixsocket` domain socket server
// might not receive the EOF sent by the other end, leading to a hang:
// https://github.com/scalapb/protoc-bridge/issues/379
// However, confusingly, adding an arbitrary read timeout resolves the issue.
// We thus add a read timeout of 1 minute here, which should be more than enough.
// It also helps to prevent an infinite hang on both Windows and macOS due to
// unexpected issues.
// client.setSoTimeout(60000)
try {
val response =
PluginFrontend.runWithInputStream(
Expand All @@ -36,16 +40,5 @@ abstract class SocketBasedPluginFrontend extends PluginFrontend {
}
}
}

(sh, InternalState(ss, sh))
}

override def cleanup(state: InternalState): Unit = {
state.serverSocket.close()
if (sys.props.get("protocbridge.debug") != Some("1")) {
Files.delete(state.shellScript)
}
}

protected def createShellScript(port: Int): Path
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,38 @@
package protocbridge.frontend

import java.nio.file.{Path, Paths}
import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.net.ServerSocket
import java.nio.file.{Files, Path, Paths}

/** A PluginFrontend that binds a server socket to a local interface. The plugin
* is a batch script that invokes BridgeApp.main() method, in a new JVM with
* the same parameters as the currently running JVM. The plugin will
* communicate its stdin and stdout to this socket.
*/
object WindowsPluginFrontend extends SocketBasedPluginFrontend {
case class InternalState(shellScript: Path, serverSocket: ServerSocket)

override def prepare(
plugin: ProtocCodeGenerator,
env: ExtraEnv
): (Path, InternalState) = {
val ss = new ServerSocket(0) // Bind to any available port.
val sh = createShellScript(ss.getLocalPort)

runWithSocket(plugin, env, ss)

(sh, InternalState(sh, ss))
}

override def cleanup(state: InternalState): Unit = {
state.serverSocket.close()
if (sys.props.get("protocbridge.debug") != Some("1")) {
Files.delete(state.shellScript)
}
}

protected def createShellScript(port: Int): Path = {
private def createShellScript(port: Int): Path = {
val classPath =
Paths.get(getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
val classPathBatchString = classPath.toString.replace("%", "%%")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package protocbridge.frontend

import org.apache.commons.io.IOUtils
import org.scalatest.exceptions.TestFailedException
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.must.Matchers
import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.io.ByteArrayOutputStream
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future, TimeoutException}
import scala.sys.process.ProcessIO
import scala.util.Random

Expand All @@ -30,20 +35,22 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers {
writeInput.close()
},
processOutput => {
val buffer = new Array[Byte](4096)
var bytesRead = 0
while (bytesRead != -1) {
bytesRead = processOutput.read(buffer)
if (bytesRead != -1) {
actualOutput.write(buffer, 0, bytesRead)
}
}
IOUtils.copy(processOutput, actualOutput)
processOutput.close()
},
_.close()
processError => {
IOUtils.copy(processError, System.err)
processError.close()
}
)
)
process.exitValue()
try {
Await.result(Future { process.exitValue() }, 5.seconds)
} catch {
case _: TimeoutException =>
System.err.println(s"Timeout")
process.destroy()
}
frontend.cleanup(state)
(state, actualOutput.toByteArray)
}
Expand All @@ -52,8 +59,8 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers {
frontend: PluginFrontend
): frontend.InternalState = {
val random = new Random()
val toSend = Array.fill(123)(random.nextInt(256).toByte)
val toReceive = Array.fill(456)(random.nextInt(256).toByte)
val toSend = Array.fill(100000)(random.nextInt(256).toByte)
val toReceive = Array.fill(100000)(random.nextInt(256).toByte)
val env = new ExtraEnv(secondaryOutputDir = "tmp")

val fakeGenerator = new ProtocCodeGenerator {
Expand All @@ -62,9 +69,25 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers {
toReceive
}
}
// Repeat 100,000 times since named pipes on macOS are flaky.
val repeatCount = 100000
for (i <- 1 until repeatCount) {
if (i % 100 == 1) println(s"Running iteration $i of $repeatCount")
val (state, response) =
testPluginFrontend(frontend, fakeGenerator, env, toSend)
if (!(response sameElements toReceive)) {
System.err.println(
s"Failed on iteration $i of $repeatCount ($state): ${response.length} != ${toReceive.length}"
)
}
}
val (state, response) =
testPluginFrontend(frontend, fakeGenerator, env, toSend)
response mustBe toReceive
if (!(response sameElements toReceive)) {
System.err.println(
s"Failed on iteration $repeatCount of $repeatCount ($state): ${response.length} != ${toReceive.length}"
)
}
state
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package protocbridge.frontend
class PosixPluginFrontendSpec extends OsSpecificFrontendSpec {
if (!PluginFrontend.isWindows && !PluginFrontend.isMac) {
it must "execute a program that forwards input and output to given stream" in {
testSuccess(PosixPluginFrontend)
testSuccess(MacPluginFrontend)
}

it must "not hang if there is an OOM in generator" in {
testFailure(PosixPluginFrontend)
testFailure(MacPluginFrontend)
}
}
}
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ lazy val bridge: Project = project
"org.scalatest" %% "scalatest" % "3.2.17" % "test",
"org.scalacheck" %% "scalacheck" % "1.17.0" % "test",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test",
"io.get-coursier" %% "coursier" % coursierVersion % "test"
"io.get-coursier" %% "coursier" % coursierVersion % "test",
"com.kohlschutter.junixsocket" % "junixsocket-core" % "2.10.0"
),
scalacOptions ++= (if (scalaVersion.value.startsWith("2.13."))
Seq("-Wconf:origin=.*JavaConverters.*:s")
Expand Down
88 changes: 88 additions & 0 deletions domain_socket_stress_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env bash

BYTE_LENGTH="$1"
SERVER_MODE="${2:-nc-save}"
CLIENT_MODE="${3:-nc}"

TEST_FILE_PATH="/tmp/domain_socket_test_file"
SOCKET_PATH="/tmp/domain_socket_test.sck"
SERVER_RESULT_PATH="$TEST_FILE_PATH.server"
CLIENT_RESULT_PATH="$TEST_FILE_PATH.client"
dd if=/dev/urandom of="$TEST_FILE_PATH" bs=1 count="$BYTE_LENGTH" 2>/dev/null

if [[ "$SERVER_MODE" == *"-save" ]]; then
TEST_RESULT_PATH="$SERVER_RESULT_PATH"
else
TEST_RESULT_PATH="$CLIENT_RESULT_PATH"
fi

test_socket() {
# Start a process to consume the data from the socket
if [[ "$SERVER_MODE" == "nc-save" ]]; then
(nc -l -U "$SOCKET_PATH" > "$SERVER_RESULT_PATH" && echo "Completed saving random bytes from the socket") &
elif [[ "$SERVER_MODE" == "ncat-save" ]]; then
(ncat -l -U "$SOCKET_PATH" > "$SERVER_RESULT_PATH" && echo "Completed saving random bytes from the socket") &
elif [[ "$SERVER_MODE" == "socat-save" ]]; then
(socat UNIX-LISTEN:"$SOCKET_PATH" - > "$SERVER_RESULT_PATH" && echo "Completed saving random bytes from the socket") &
elif [[ "$SERVER_MODE" == "socat-echo" ]]; then
(socat UNIX-LISTEN:"$SOCKET_PATH" EXEC:"/bin/cat" && echo "Completed echoing random bytes from the socket") &
else
echo "Invalid server mode: $SERVER_MODE"
exit 1
fi
SERVER_PID=$!
echo "Starting the server (PID: $SERVER_PID)"

# Wait for the socket file to be created so that the server has started
while [ ! -e "$SOCKET_PATH" ]; do
sleep 0.001
done
echo "The server has started and is listening to the socket (PID: $SERVER_PID)"

# `nc` can fail even if we wait for another second to ensure the server has started
# sleep 1

# Start dumping random bytes to the socket in the background
if [[ "$CLIENT_MODE" == "nc" ]]; then
(nc -U "$SOCKET_PATH" < "$TEST_FILE_PATH" > "$CLIENT_RESULT_PATH" && echo "Completed dumping random bytes to the socket") &
elif [[ "$CLIENT_MODE" == "ncat" ]]; then
(ncat -U "$SOCKET_PATH" < "$TEST_FILE_PATH" > "$CLIENT_RESULT_PATH" && echo "Completed dumping random bytes to the socket") &
elif [[ "$CLIENT_MODE" == "socat" ]]; then
(socat - UNIX-CONNECT:"$SOCKET_PATH" < "$TEST_FILE_PATH" > "$CLIENT_RESULT_PATH" && echo "Completed dumping random bytes to the socket") &
else
echo "Invalid client mode: $CLIENT_MODE"
exit 1
fi
CLIENT_PID=$!
echo "Started dumping random bytes to the socket (PID: $CLIENT_PID)"

# Ensure the client process is killed
wait $CLIENT_PID 2>/dev/null
echo "The client process has stopped (PID: $CLIENT_PID)"

# Ensure the server process is killed
wait $SERVER_PID 2>/dev/null
echo "The server process has stopped (PID: $SERVER_PID)"

# Check the size of the data read from the socket
DATA_SIZE=$(wc -c < "$TEST_RESULT_PATH")
if [ "$DATA_SIZE" -ne "$BYTE_LENGTH" ]; then
echo "Error: Expected $BYTE_LENGTH bytes, but read $DATA_SIZE bytes"
exit 1
else
echo "Successfully read $BYTE_LENGTH bytes from the socket"
fi

rm -f "$SOCKET_PATH"
rm -f "$SERVER_RESULT_PATH"
rm -f "$CLIENT_RESULT_PATH"
}

rm -f "$SOCKET_PATH"

# Repeat the process
counter=0;
while test_socket; do
((counter++)); echo "Iterations completed: $counter";
done
echo "Command failed after $counter successful iterations."