Skip to content

Commit ad3747e

Browse files
felixweinbergerjingx8885surya-prakash-susarla
committed
Fix child process cleanup in stdio termination
When terminating MCP servers, child processes were being orphaned because only the parent process was killed. This caused resource leaks and prevented proper cleanup, especially with tools like npx that spawn child processes for the actual server implementation. This was happening on both POSIX and Windows systems - however because of implementation details, resolving this is non-trivial and requires introducing psutil to introduce cross-platform utilities for dealing with children and process trees. This addresses critical issues where MCP servers using process spawning tools would leave zombie processes running after client shutdown. resolves #850 resolves #729 Co-authored-by: jingx8885 <[email protected]> Co-authored-by: Surya Prakash Susarla <[email protected]>
1 parent cf72565 commit ad3747e

File tree

5 files changed

+511
-12
lines changed

5 files changed

+511
-12
lines changed

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ dependencies = [
3232
"pydantic-settings>=2.5.2",
3333
"uvicorn>=0.23.1; sys_platform != 'emscripten'",
3434
"jsonschema>=4.20.0",
35+
"pywin32>=310; sys_platform == 'win32'",
3536
]
3637

3738
[project.optional-dependencies]
@@ -125,4 +126,6 @@ filterwarnings = [
125126
"ignore::DeprecationWarning:websockets",
126127
"ignore:websockets.server.WebSocketServerProtocol is deprecated:DeprecationWarning",
127128
"ignore:Returning str or bytes.*:DeprecationWarning:mcp.server.lowlevel",
129+
# pywin32 internal deprecation warning
130+
"ignore:getargs.*The 'u' format is deprecated:DeprecationWarning"
128131
]

src/mcp/client/stdio/__init__.py

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1+
import logging
12
import os
3+
import signal
24
import sys
35
from contextlib import asynccontextmanager
46
from pathlib import Path
57
from typing import Literal, TextIO
68

79
import anyio
810
import anyio.lowlevel
11+
from anyio.abc import Process
912
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
1013
from anyio.streams.text import TextReceiveStream
1114
from pydantic import BaseModel, Field
@@ -14,10 +17,14 @@
1417
from mcp.shared.message import SessionMessage
1518

1619
from .win32 import (
20+
FallbackProcess,
1721
create_windows_process,
1822
get_windows_executable_command,
23+
terminate_windows_process_tree,
1924
)
2025

26+
logger = logging.getLogger("client.stdio")
27+
2128
# Environment variables to inherit by default
2229
DEFAULT_INHERITED_ENV_VARS = (
2330
[
@@ -187,7 +194,7 @@ async def stdin_writer():
187194
await process.wait()
188195
except TimeoutError:
189196
# If process doesn't terminate in time, force kill it
190-
process.kill()
197+
await _terminate_process_tree(process)
191198
except ProcessLookupError:
192199
# Process already exited, which is fine
193200
pass
@@ -222,11 +229,65 @@ async def _create_platform_compatible_process(
222229
):
223230
"""
224231
Creates a subprocess in a platform-compatible way.
225-
Returns a process handle.
232+
233+
Unix: Creates process in a new session/process group for killpg support
234+
Windows: Creates process in a Job Object for reliable child termination
226235
"""
227236
if sys.platform == "win32":
228237
process = await create_windows_process(command, args, env, errlog, cwd)
229238
else:
230-
process = await anyio.open_process([command, *args], env=env, stderr=errlog, cwd=cwd)
239+
process = await anyio.open_process(
240+
[command, *args],
241+
env=env,
242+
stderr=errlog,
243+
cwd=cwd,
244+
start_new_session=True,
245+
)
231246

232247
return process
248+
249+
250+
async def _terminate_process_tree(process: Process | FallbackProcess, timeout: float = 2.0) -> None:
251+
"""
252+
Terminate a process and all its children using platform-specific methods.
253+
254+
Unix: Uses os.killpg() for atomic process group termination
255+
Windows: Uses Job Objects via pywin32 for reliable child process cleanup
256+
"""
257+
if sys.platform == "win32":
258+
await terminate_windows_process_tree(process, timeout)
259+
else:
260+
pid = getattr(process, "pid", None) or getattr(getattr(process, "popen", None), "pid", None)
261+
if not pid:
262+
return
263+
264+
try:
265+
pgid = os.getpgid(pid)
266+
os.killpg(pgid, signal.SIGTERM)
267+
268+
deadline = anyio.current_time() + timeout
269+
while anyio.current_time() < deadline:
270+
try:
271+
# Check if process group still exists (signal 0 = check only)
272+
os.killpg(pgid, 0)
273+
await anyio.sleep(0.1)
274+
except ProcessLookupError:
275+
return
276+
277+
try:
278+
os.killpg(pgid, signal.SIGKILL)
279+
except ProcessLookupError:
280+
pass
281+
282+
except (ProcessLookupError, PermissionError, OSError) as e:
283+
logger.warning(f"Process group termination failed for PID {pid}: {e}, falling back to simple terminate")
284+
try:
285+
process.terminate()
286+
with anyio.fail_after(timeout):
287+
await process.wait()
288+
except Exception as term_error:
289+
logger.warning(f"Process termination failed for PID {pid}: {term_error}, attempting force kill")
290+
try:
291+
process.kill()
292+
except Exception as kill_error:
293+
logger.error(f"Failed to kill process {pid}: {kill_error}")

src/mcp/client/stdio/win32.py

Lines changed: 117 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Windows-specific functionality for stdio client operations.
33
"""
44

5+
import logging
56
import shutil
67
import subprocess
78
import sys
@@ -14,6 +15,23 @@
1415
from anyio.streams.file import FileReadStream, FileWriteStream
1516
from typing_extensions import deprecated
1617

18+
logger = logging.getLogger("client.stdio.win32")
19+
20+
# Windows-specific imports for Job Objects
21+
if sys.platform == "win32":
22+
import pywintypes
23+
import win32api
24+
import win32con
25+
import win32job
26+
else:
27+
# Type stubs for non-Windows platforms
28+
win32api = None
29+
win32con = None
30+
win32job = None
31+
pywintypes = None
32+
33+
JobHandle = int
34+
1735

1836
def get_windows_executable_command(command: str) -> str:
1937
"""
@@ -104,6 +122,11 @@ def kill(self) -> None:
104122
"""Kill the subprocess immediately (alias for terminate)."""
105123
self.terminate()
106124

125+
@property
126+
def pid(self) -> int:
127+
"""Return the process ID."""
128+
return self.popen.pid
129+
107130

108131
# ------------------------
109132
# Updated function
@@ -118,13 +141,16 @@ async def create_windows_process(
118141
cwd: Path | str | None = None,
119142
) -> Process | FallbackProcess:
120143
"""
121-
Creates a subprocess in a Windows-compatible way.
144+
Creates a subprocess in a Windows-compatible way with Job Object support.
122145
123146
Attempt to use anyio's open_process for async subprocess creation.
124147
In some cases this will throw NotImplementedError on Windows, e.g.
125148
when using the SelectorEventLoop which does not support async subprocesses.
126149
In that case, we fall back to using subprocess.Popen.
127150
151+
The process is automatically added to a Job Object to ensure all child
152+
processes are terminated when the parent is terminated.
153+
128154
Args:
129155
command (str): The executable to run
130156
args (list[str]): List of command line arguments
@@ -133,8 +159,11 @@ async def create_windows_process(
133159
cwd (Path | str | None): Working directory for the subprocess
134160
135161
Returns:
136-
FallbackProcess: Async-compatible subprocess with stdin and stdout streams
162+
Process | FallbackProcess: Async-compatible subprocess with stdin and stdout streams
137163
"""
164+
job = _create_job_object()
165+
process = None
166+
138167
try:
139168
# First try using anyio with Windows-specific flags to hide console window
140169
process = await anyio.open_process(
@@ -147,10 +176,9 @@ async def create_windows_process(
147176
stderr=errlog,
148177
cwd=cwd,
149178
)
150-
return process
151179
except NotImplementedError:
152-
# Windows often doesn't support async subprocess creation, use fallback
153-
return await _create_windows_fallback_process(command, args, env, errlog, cwd)
180+
# If Windows doesn't support async subprocess creation, use fallback
181+
process = await _create_windows_fallback_process(command, args, env, errlog, cwd)
154182
except Exception:
155183
# Try again without creation flags
156184
process = await anyio.open_process(
@@ -159,7 +187,9 @@ async def create_windows_process(
159187
stderr=errlog,
160188
cwd=cwd,
161189
)
162-
return process
190+
191+
_maybe_assign_process_to_job(process, job)
192+
return process
163193

164194

165195
async def _create_windows_fallback_process(
@@ -186,8 +216,6 @@ async def _create_windows_fallback_process(
186216
bufsize=0, # Unbuffered output
187217
creationflags=getattr(subprocess, "CREATE_NO_WINDOW", 0),
188218
)
189-
return FallbackProcess(popen_obj)
190-
191219
except Exception:
192220
# If creationflags failed, fallback without them
193221
popen_obj = subprocess.Popen(
@@ -199,7 +227,87 @@ async def _create_windows_fallback_process(
199227
cwd=cwd,
200228
bufsize=0,
201229
)
202-
return FallbackProcess(popen_obj)
230+
process = FallbackProcess(popen_obj)
231+
return process
232+
233+
234+
def _create_job_object() -> int | None:
235+
"""
236+
Create a Windows Job Object configured to terminate all processes when closed.
237+
"""
238+
if sys.platform != "win32" or not win32job:
239+
return None
240+
241+
try:
242+
job = win32job.CreateJobObject(None, "")
243+
extended_info = win32job.QueryInformationJobObject(job, win32job.JobObjectExtendedLimitInformation)
244+
245+
extended_info["BasicLimitInformation"]["LimitFlags"] |= win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
246+
win32job.SetInformationJobObject(job, win32job.JobObjectExtendedLimitInformation, extended_info)
247+
return job
248+
except Exception as e:
249+
logger.warning(f"Failed to create Job Object for process tree management: {e}")
250+
return None
251+
252+
253+
def _maybe_assign_process_to_job(process: Process | FallbackProcess, job: JobHandle | None) -> None:
254+
"""
255+
Try to assign a process to a job object. If assignment fails
256+
for any reason, the job handle is closed.
257+
"""
258+
if not job:
259+
return
260+
261+
if sys.platform != "win32" or not win32api or not win32con or not win32job:
262+
return
263+
264+
try:
265+
process_handle = win32api.OpenProcess(
266+
win32con.PROCESS_SET_QUOTA | win32con.PROCESS_TERMINATE, False, process.pid
267+
)
268+
if not process_handle:
269+
raise Exception("Failed to open process handle")
270+
271+
try:
272+
win32job.AssignProcessToJobObject(job, process_handle)
273+
process._job_object = job
274+
finally:
275+
win32api.CloseHandle(process_handle)
276+
except Exception as e:
277+
logger.warning(f"Failed to assign process {process.pid} to Job Object: {e}")
278+
if win32api:
279+
win32api.CloseHandle(job)
280+
281+
282+
async def terminate_windows_process_tree(process: Process | FallbackProcess, timeout: float = 2.0) -> None:
283+
"""
284+
Terminate a process and all its children on Windows.
285+
286+
If the process has an associated job object, it will be terminated.
287+
Otherwise, falls back to basic process termination.
288+
"""
289+
if sys.platform != "win32":
290+
return
291+
292+
job = getattr(process, "_job_object", None)
293+
if job and win32job:
294+
try:
295+
win32job.TerminateJobObject(job, 1)
296+
except Exception:
297+
# Job might already be terminated
298+
pass
299+
finally:
300+
if win32api:
301+
try:
302+
win32api.CloseHandle(job)
303+
except Exception:
304+
pass
305+
306+
# Always try to terminate the process itself as well
307+
try:
308+
process.terminate()
309+
except Exception:
310+
pass
203311

204312

205313
@deprecated(

0 commit comments

Comments
 (0)