@@ -7,27 +7,33 @@ import io.kotest.matchers.shouldBe
77import io.kotest.matchers.types.shouldBeInstanceOf
88import io.kotest.matchers.types.shouldBeSameInstanceAs
99import io.mockk.coEvery
10- import io.mockk.every
1110import io.mockk.mockk
1211import io.modelcontextprotocol.kotlin.sdk.client.StdioClientTransport
1312import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage
1413import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCRequest
1514import io.modelcontextprotocol.kotlin.sdk.types.McpException
1615import io.modelcontextprotocol.kotlin.sdk.types.RPCError.ErrorCode
1716import kotlinx.coroutines.CancellationException
17+ import kotlinx.coroutines.Dispatchers
1818import kotlinx.coroutines.channels.Channel
1919import kotlinx.coroutines.channels.ClosedSendChannelException
2020import kotlinx.coroutines.delay
21+ import kotlinx.coroutines.runBlocking
2122import kotlinx.coroutines.test.runTest
2223import kotlinx.io.Buffer
24+ import kotlinx.io.asSource
25+ import kotlinx.io.buffered
2326import kotlinx.io.writeString
2427import org.junit.jupiter.params.ParameterizedTest
2528import org.junit.jupiter.params.provider.Arguments
2629import org.junit.jupiter.params.provider.MethodSource
30+ import java.io.PipedInputStream
31+ import java.io.PipedOutputStream
2732import java.util.stream.Stream
2833import kotlin.concurrent.atomics.AtomicBoolean
2934import kotlin.concurrent.atomics.ExperimentalAtomicApi
3035import kotlin.test.Test
36+ import kotlin.time.Duration.Companion.milliseconds
3137import kotlin.time.Duration.Companion.seconds
3238
3339/* *
@@ -39,17 +45,23 @@ class StdioClientTransportErrorHandlingTest {
3945
4046 @OptIn(ExperimentalAtomicApi ::class )
4147 @Test
42- fun `should continue on stderr EOF` () = runTest {
43- val stderrBuffer = Buffer ()
48+ fun `should continue on stderr EOF` (): Unit = runBlocking(Dispatchers .IO ) {
4449 // Empty stderr = immediate EOF
50+ val stderrBuffer = Buffer ()
51+
52+ // Create a pipe for stdin that stays open (simulates real stdin behavior)
53+ val pipedOutputStream = PipedOutputStream ()
54+ val pipedInputStream = PipedInputStream (pipedOutputStream)
55+
56+ // Write one message to stdin
57+ pipedOutputStream.write(""" data: {"jsonrpc":"2.0","method":"ping","id":1}\n\n""" .toByteArray())
58+ pipedOutputStream.flush()
59+ // Keep the pipe open by not closing pipedOutputStream - this prevents stdin EOF
4560
46- val inputBuffer = createNonEmptyBuffer {
47- """ data: {"jsonrpc":"2.0","method":"ping","id":1}\n\n"""
48- }
4961 val outputBuffer = Buffer ()
5062
5163 transport = StdioClientTransport (
52- input = inputBuffer ,
64+ input = pipedInputStream.asSource().buffered() ,
5365 output = outputBuffer,
5466 error = stderrBuffer,
5567 )
@@ -59,15 +71,21 @@ class StdioClientTransportErrorHandlingTest {
5971
6072 transport.start()
6173
62- // Stderr EOF should not close transport
63- // Use eventually to handle timing differences across platforms (especially Windows)
74+ // Wait for stderr EOF and stdin message to be processed
75+ delay(500 .milliseconds)
76+
77+ // Transport should still be alive because stdin is still open (not EOF'd)
78+ closeCalled.load() shouldBe false
79+
80+ // Close pipes to trigger stdin EOF.
81+ // `transport.close()` cann't help here, since the underlying Java read() is blocked on I/O operation
82+ pipedOutputStream.close()
83+ pipedInputStream.close()
84+
85+ // Transport should close when stdin EOF is detected
6486 eventually(2 .seconds) {
65- // Wait for stderr to be processed, then verify transport is still open
66- closeCalled.load() shouldBe false
87+ closeCalled.load() shouldBe true
6788 }
68-
69- transport.close()
70- closeCalled.load() shouldBe true
7189 }
7290
7391 @Test
@@ -162,18 +180,13 @@ class StdioClientTransportErrorHandlingTest {
162180 fun `Send should handle exceptions` (throwable : Throwable , shouldWrap : Boolean , expectedCode : Int? ) = runTest {
163181 val sendChannel: Channel <JSONRPCMessage > = mockk(relaxed = true )
164182
165- // Create a stdin Source that never returns EOF to prevent transport from closing
166- // Use mockk since Source is a sealed interface
167- val stdin: kotlinx.io.Source = mockk(relaxed = true )
168- every { stdin.readAtMostTo(any<Buffer >(), any()) } coAnswers {
169- // Return 0 to indicate no data available (but not EOF)
170- // This keeps the transport alive without blocking
171- delay(10 ) // Small delay to prevent busy-waiting
172- 0L
173- }
183+ // Create stdin pipe that stays open to prevent transport from closing
184+ val pipedOutputStream = PipedOutputStream ()
185+ val pipedInputStream = PipedInputStream (pipedOutputStream)
186+ // Keep pipe open (don't write or close) - stdin will block on read, not EOF
174187
175188 transport = StdioClientTransport (
176- input = stdin ,
189+ input = pipedInputStream.asSource().buffered() ,
177190 output = Buffer (),
178191 sendChannel = sendChannel,
179192 )
@@ -195,6 +208,10 @@ class StdioClientTransportErrorHandlingTest {
195208 } else {
196209 exception shouldBeSameInstanceAs throwable
197210 }
211+
212+ // Cleanup
213+ pipedOutputStream.close()
214+ pipedInputStream.close()
198215 }
199216
200217 fun createNonEmptyBuffer (block : () -> String ): Buffer {
0 commit comments