Skip to content

[WIP] Add regression tests for #555, #559, #765, #729, #850 #1044

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
"pydantic-settings>=2.5.2",
"uvicorn>=0.23.1; sys_platform != 'emscripten'",
"jsonschema>=4.20.0",
"psutil>=5.9.0,<6.0.0",
]

[project.optional-dependencies]
Expand Down
110 changes: 99 additions & 11 deletions src/mcp/client/stdio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import anyio
import anyio.lowlevel
import anyio.to_thread
import psutil
from anyio.abc import Process
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from anyio.streams.text import TextReceiveStream
from pydantic import BaseModel, Field
Expand All @@ -14,9 +17,9 @@
from mcp.shared.message import SessionMessage

from .win32 import (
FallbackProcess,
create_windows_process,
get_windows_executable_command,
terminate_windows_process,
)

# Environment variables to inherit by default
Expand Down Expand Up @@ -178,15 +181,7 @@ async def stdin_writer():
try:
yield read_stream, write_stream
finally:
# Clean up process to prevent any dangling orphaned processes
try:
if sys.platform == "win32":
await terminate_windows_process(process)
else:
process.terminate()
except ProcessLookupError:
# Process already exited, which is fine
pass
await _shutdown_process(process)
await read_stream.aclose()
await write_stream.aclose()
await read_stream_writer.aclose()
Expand Down Expand Up @@ -223,6 +218,99 @@ async def _create_platform_compatible_process(
if sys.platform == "win32":
process = await create_windows_process(command, args, env, errlog, cwd)
else:
process = await anyio.open_process([command, *args], env=env, stderr=errlog, cwd=cwd)
process = await anyio.open_process(
[command, *args],
env=env,
stderr=errlog,
cwd=cwd,
start_new_session=True,
)

return process


async def _shutdown_process(process: Process | FallbackProcess) -> None:
"""
MCP spec: stdio shutdown sequence
1. Close input stream to server
2. Wait for server to exit, or send SIGTERM if it doesn't exit in time
3. Send SIGKILL if still not exited (forcibly kill on Windows)
"""

# Close input stream to server
if process.stdin:
try:
await process.stdin.aclose()
except Exception:
# stdin might already be closed, which is fine
pass

try:
# Wait for server to exit gracefully after stdin closes
with anyio.fail_after(2.0):
await process.wait()
except TimeoutError:
# 2. send SIGTERM if it doesn't exit in time
# 3. Send SIGKILL if still not exited (forcibly kill on Windows)
await _terminate_process_with_children(process)
except ProcessLookupError:
# Process already exited, which is fine
pass


async def _terminate_process_with_children(process: Process | FallbackProcess, timeout: float = 2.0) -> None:
"""
Terminate a process and all its children using psutil.

This provides consistent behavior across platforms and properly
handles process trees without shell commands.

Platform behavior:
- On Unix: psutil.terminate() sends SIGTERM, allowing graceful shutdown
- On Windows: psutil.terminate() calls TerminateProcess() which is immediate
and doesn't allow cleanup handlers to run. This can cause ResourceWarnings
for subprocess.Popen objects that don't get to clean up.
"""
pid = getattr(process, "pid", None)
if pid is None:
popen = getattr(process, "popen", None)
if popen:
pid = getattr(popen, "pid", None)

if not pid:
# Process has no PID, cannot terminate
return

try:
parent = psutil.Process(pid)
children = parent.children(recursive=True)

# First, try graceful termination for all children
for child in children:
try:
child.terminate()
except psutil.NoSuchProcess:
pass

# Then, also terminate the parent process
try:
parent.terminate()
except psutil.NoSuchProcess:
return

# Wait for processes to exit gracefully, force kill any that remain
all_procs = children + [parent]
_, alive = await anyio.to_thread.run_sync(lambda: psutil.wait_procs(all_procs, timeout=timeout))
for proc in alive:
try:
proc.kill()
except psutil.NoSuchProcess:
pass

# Wait a bit more for force-killed processes
if alive:
await anyio.to_thread.run_sync(lambda: psutil.wait_procs(alive, timeout=0.5))

except psutil.NoSuchProcess:
# Process already terminated
pass
52 changes: 24 additions & 28 deletions src/mcp/client/stdio/win32.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
from typing import BinaryIO, TextIO, cast

import anyio
from anyio import to_thread
from anyio.abc import Process
from anyio.streams.file import FileReadStream, FileWriteStream


Expand Down Expand Up @@ -75,9 +73,18 @@ async def __aexit__(
exc_val: BaseException | None,
exc_tb: object | None,
) -> None:
"""Terminate and wait on process exit inside a thread."""
self.popen.terminate()
await to_thread.run_sync(self.popen.wait)
"""Clean up process and streams.

Attempts to terminate the process, but doesn't fail if termination
is not possible (e.g., process already dead or being handled elsewhere).
"""
try:
self.popen.terminate()
with anyio.move_on_after(0.5):
await self.wait()
except (ProcessLookupError, OSError):
# Process already dead or being handled elsewhere
pass

# Close the file handles to prevent ResourceWarning
if self.stdin:
Expand All @@ -92,8 +99,13 @@ async def __aexit__(
self.stderr.close()

async def wait(self):
"""Async wait for process completion."""
return await to_thread.run_sync(self.popen.wait)
"""
Poll the process status instead of blocking wait
This allows anyio timeouts to work properly
"""
while self.popen.poll() is None:
await anyio.sleep(0.1)
return self.popen.returncode

def terminate(self):
"""Terminate the subprocess immediately."""
Expand All @@ -103,6 +115,11 @@ def kill(self) -> None:
"""Kill the subprocess immediately (alias for terminate)."""
self.terminate()

@property
def pid(self) -> int:
"""Return the process ID."""
return self.popen.pid


# ------------------------
# Updated function
Expand Down Expand Up @@ -159,24 +176,3 @@ async def create_windows_process(
bufsize=0,
)
return FallbackProcess(popen_obj)


async def terminate_windows_process(process: Process | FallbackProcess):
"""
Terminate a Windows process.

Note: On Windows, terminating a process with process.terminate() doesn't
always guarantee immediate process termination.
So we give it 2s to exit, or we call process.kill()
which sends a SIGKILL equivalent signal.

Args:
process: The process to terminate
"""
try:
process.terminate()
with anyio.fail_after(2.0):
await process.wait()
except TimeoutError:
# Force kill if it doesn't terminate
process.kill()
Loading
Loading