Skip to content

Commit e033621

Browse files
felixweinbergerjingx8885surya-prakash-susarla
authored
Improve child process termination on POSIX & Windows (#1078)
Co-authored-by: jingx8885 <[email protected]> Co-authored-by: Surya Prakash Susarla <[email protected]>
1 parent cf72565 commit e033621

File tree

9 files changed

+552
-15
lines changed

9 files changed

+552
-15
lines changed

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ dependencies = [
3232
"pydantic-settings>=2.5.2",
3333
"uvicorn>=0.23.1; sys_platform != 'emscripten'",
3434
"jsonschema>=4.20.0",
35+
"pywin32>=310; sys_platform == 'win32'",
3536
]
3637

3738
[project.optional-dependencies]
@@ -125,4 +126,6 @@ filterwarnings = [
125126
"ignore::DeprecationWarning:websockets",
126127
"ignore:websockets.server.WebSocketServerProtocol is deprecated:DeprecationWarning",
127128
"ignore:Returning str or bytes.*:DeprecationWarning:mcp.server.lowlevel",
129+
# pywin32 internal deprecation warning
130+
"ignore:getargs.*The 'u' format is deprecated:DeprecationWarning"
128131
]

src/mcp/client/stdio/__init__.py

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import os
23
import sys
34
from contextlib import asynccontextmanager
@@ -6,17 +7,22 @@
67

78
import anyio
89
import anyio.lowlevel
10+
from anyio.abc import Process
911
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
1012
from anyio.streams.text import TextReceiveStream
1113
from pydantic import BaseModel, Field
1214

1315
import mcp.types as types
14-
from mcp.shared.message import SessionMessage
15-
16-
from .win32 import (
16+
from mcp.os.posix.utilities import terminate_posix_process_tree
17+
from mcp.os.win32.utilities import (
18+
FallbackProcess,
1719
create_windows_process,
1820
get_windows_executable_command,
21+
terminate_windows_process_tree,
1922
)
23+
from mcp.shared.message import SessionMessage
24+
25+
logger = logging.getLogger(__name__)
2026

2127
# Environment variables to inherit by default
2228
DEFAULT_INHERITED_ENV_VARS = (
@@ -187,7 +193,7 @@ async def stdin_writer():
187193
await process.wait()
188194
except TimeoutError:
189195
# If process doesn't terminate in time, force kill it
190-
process.kill()
196+
await _terminate_process_tree(process)
191197
except ProcessLookupError:
192198
# Process already exited, which is fine
193199
pass
@@ -222,11 +228,38 @@ async def _create_platform_compatible_process(
222228
):
223229
"""
224230
Creates a subprocess in a platform-compatible way.
225-
Returns a process handle.
231+
232+
Unix: Creates process in a new session/process group for killpg support
233+
Windows: Creates process in a Job Object for reliable child termination
226234
"""
227235
if sys.platform == "win32":
228236
process = await create_windows_process(command, args, env, errlog, cwd)
229237
else:
230-
process = await anyio.open_process([command, *args], env=env, stderr=errlog, cwd=cwd)
238+
process = await anyio.open_process(
239+
[command, *args],
240+
env=env,
241+
stderr=errlog,
242+
cwd=cwd,
243+
start_new_session=True,
244+
)
231245

232246
return process
247+
248+
249+
async def _terminate_process_tree(process: Process | FallbackProcess, timeout_seconds: float = 2.0) -> None:
250+
"""
251+
Terminate a process and all its children using platform-specific methods.
252+
253+
Unix: Uses os.killpg() for atomic process group termination
254+
Windows: Uses Job Objects via pywin32 for reliable child process cleanup
255+
256+
Args:
257+
process: The process to terminate
258+
timeout_seconds: Timeout in seconds before force killing (default: 2.0)
259+
"""
260+
if sys.platform == "win32":
261+
await terminate_windows_process_tree(process, timeout_seconds)
262+
else:
263+
# FallbackProcess should only be used for Windows compatibility
264+
assert isinstance(process, Process)
265+
await terminate_posix_process_tree(process, timeout_seconds)

src/mcp/os/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Platform-specific utilities for MCP."""

src/mcp/os/posix/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""POSIX-specific utilities for MCP."""

src/mcp/os/posix/utilities.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
"""
2+
POSIX-specific functionality for stdio client operations.
3+
"""
4+
5+
import logging
6+
import os
7+
import signal
8+
9+
import anyio
10+
from anyio.abc import Process
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
async def terminate_posix_process_tree(process: Process, timeout_seconds: float = 2.0) -> None:
16+
"""
17+
Terminate a process and all its children on POSIX systems.
18+
19+
Uses os.killpg() for atomic process group termination.
20+
21+
Args:
22+
process: The process to terminate
23+
timeout_seconds: Timeout in seconds before force killing (default: 2.0)
24+
"""
25+
pid = getattr(process, "pid", None) or getattr(getattr(process, "popen", None), "pid", None)
26+
if not pid:
27+
# No PID means there's no process to terminate - it either never started,
28+
# already exited, or we have an invalid process object
29+
return
30+
31+
try:
32+
pgid = os.getpgid(pid)
33+
os.killpg(pgid, signal.SIGTERM)
34+
35+
with anyio.move_on_after(timeout_seconds):
36+
while True:
37+
try:
38+
# Check if process group still exists (signal 0 = check only)
39+
os.killpg(pgid, 0)
40+
await anyio.sleep(0.1)
41+
except ProcessLookupError:
42+
return
43+
44+
try:
45+
os.killpg(pgid, signal.SIGKILL)
46+
except ProcessLookupError:
47+
pass
48+
49+
except (ProcessLookupError, PermissionError, OSError) as e:
50+
logger.warning(f"Process group termination failed for PID {pid}: {e}, falling back to simple terminate")
51+
try:
52+
process.terminate()
53+
with anyio.fail_after(timeout_seconds):
54+
await process.wait()
55+
except Exception as term_error:
56+
logger.warning(f"Process termination failed for PID {pid}: {term_error}, attempting force kill")
57+
try:
58+
process.kill()
59+
except Exception as kill_error:
60+
logger.error(f"Failed to kill process {pid}: {kill_error}")

src/mcp/os/win32/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Windows-specific utilities for MCP."""

src/mcp/client/stdio/win32.py renamed to src/mcp/os/win32/utilities.py

Lines changed: 120 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Windows-specific functionality for stdio client operations.
33
"""
44

5+
import logging
56
import shutil
67
import subprocess
78
import sys
@@ -14,6 +15,23 @@
1415
from anyio.streams.file import FileReadStream, FileWriteStream
1516
from typing_extensions import deprecated
1617

18+
logger = logging.getLogger("client.stdio.win32")
19+
20+
# Windows-specific imports for Job Objects
21+
if sys.platform == "win32":
22+
import pywintypes
23+
import win32api
24+
import win32con
25+
import win32job
26+
else:
27+
# Type stubs for non-Windows platforms
28+
win32api = None
29+
win32con = None
30+
win32job = None
31+
pywintypes = None
32+
33+
JobHandle = int
34+
1735

1836
def get_windows_executable_command(command: str) -> str:
1937
"""
@@ -104,6 +122,11 @@ def kill(self) -> None:
104122
"""Kill the subprocess immediately (alias for terminate)."""
105123
self.terminate()
106124

125+
@property
126+
def pid(self) -> int:
127+
"""Return the process ID."""
128+
return self.popen.pid
129+
107130

108131
# ------------------------
109132
# Updated function
@@ -118,13 +141,16 @@ async def create_windows_process(
118141
cwd: Path | str | None = None,
119142
) -> Process | FallbackProcess:
120143
"""
121-
Creates a subprocess in a Windows-compatible way.
144+
Creates a subprocess in a Windows-compatible way with Job Object support.
122145
123146
Attempt to use anyio's open_process for async subprocess creation.
124147
In some cases this will throw NotImplementedError on Windows, e.g.
125148
when using the SelectorEventLoop which does not support async subprocesses.
126149
In that case, we fall back to using subprocess.Popen.
127150
151+
The process is automatically added to a Job Object to ensure all child
152+
processes are terminated when the parent is terminated.
153+
128154
Args:
129155
command (str): The executable to run
130156
args (list[str]): List of command line arguments
@@ -133,8 +159,11 @@ async def create_windows_process(
133159
cwd (Path | str | None): Working directory for the subprocess
134160
135161
Returns:
136-
FallbackProcess: Async-compatible subprocess with stdin and stdout streams
162+
Process | FallbackProcess: Async-compatible subprocess with stdin and stdout streams
137163
"""
164+
job = _create_job_object()
165+
process = None
166+
138167
try:
139168
# First try using anyio with Windows-specific flags to hide console window
140169
process = await anyio.open_process(
@@ -147,10 +176,9 @@ async def create_windows_process(
147176
stderr=errlog,
148177
cwd=cwd,
149178
)
150-
return process
151179
except NotImplementedError:
152-
# Windows often doesn't support async subprocess creation, use fallback
153-
return await _create_windows_fallback_process(command, args, env, errlog, cwd)
180+
# If Windows doesn't support async subprocess creation, use fallback
181+
process = await _create_windows_fallback_process(command, args, env, errlog, cwd)
154182
except Exception:
155183
# Try again without creation flags
156184
process = await anyio.open_process(
@@ -159,7 +187,9 @@ async def create_windows_process(
159187
stderr=errlog,
160188
cwd=cwd,
161189
)
162-
return process
190+
191+
_maybe_assign_process_to_job(process, job)
192+
return process
163193

164194

165195
async def _create_windows_fallback_process(
@@ -186,8 +216,6 @@ async def _create_windows_fallback_process(
186216
bufsize=0, # Unbuffered output
187217
creationflags=getattr(subprocess, "CREATE_NO_WINDOW", 0),
188218
)
189-
return FallbackProcess(popen_obj)
190-
191219
except Exception:
192220
# If creationflags failed, fallback without them
193221
popen_obj = subprocess.Popen(
@@ -199,7 +227,90 @@ async def _create_windows_fallback_process(
199227
cwd=cwd,
200228
bufsize=0,
201229
)
202-
return FallbackProcess(popen_obj)
230+
return FallbackProcess(popen_obj)
231+
232+
233+
def _create_job_object() -> int | None:
234+
"""
235+
Create a Windows Job Object configured to terminate all processes when closed.
236+
"""
237+
if sys.platform != "win32" or not win32job:
238+
return None
239+
240+
try:
241+
job = win32job.CreateJobObject(None, "")
242+
extended_info = win32job.QueryInformationJobObject(job, win32job.JobObjectExtendedLimitInformation)
243+
244+
extended_info["BasicLimitInformation"]["LimitFlags"] |= win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
245+
win32job.SetInformationJobObject(job, win32job.JobObjectExtendedLimitInformation, extended_info)
246+
return job
247+
except Exception as e:
248+
logger.warning(f"Failed to create Job Object for process tree management: {e}")
249+
return None
250+
251+
252+
def _maybe_assign_process_to_job(process: Process | FallbackProcess, job: JobHandle | None) -> None:
253+
"""
254+
Try to assign a process to a job object. If assignment fails
255+
for any reason, the job handle is closed.
256+
"""
257+
if not job:
258+
return
259+
260+
if sys.platform != "win32" or not win32api or not win32con or not win32job:
261+
return
262+
263+
try:
264+
process_handle = win32api.OpenProcess(
265+
win32con.PROCESS_SET_QUOTA | win32con.PROCESS_TERMINATE, False, process.pid
266+
)
267+
if not process_handle:
268+
raise Exception("Failed to open process handle")
269+
270+
try:
271+
win32job.AssignProcessToJobObject(job, process_handle)
272+
process._job_object = job
273+
finally:
274+
win32api.CloseHandle(process_handle)
275+
except Exception as e:
276+
logger.warning(f"Failed to assign process {process.pid} to Job Object: {e}")
277+
if win32api:
278+
win32api.CloseHandle(job)
279+
280+
281+
async def terminate_windows_process_tree(process: Process | FallbackProcess, timeout_seconds: float = 2.0) -> None:
282+
"""
283+
Terminate a process and all its children on Windows.
284+
285+
If the process has an associated job object, it will be terminated.
286+
Otherwise, falls back to basic process termination.
287+
288+
Args:
289+
process: The process to terminate
290+
timeout_seconds: Timeout in seconds before force killing (default: 2.0)
291+
"""
292+
if sys.platform != "win32":
293+
return
294+
295+
job = getattr(process, "_job_object", None)
296+
if job and win32job:
297+
try:
298+
win32job.TerminateJobObject(job, 1)
299+
except Exception:
300+
# Job might already be terminated
301+
pass
302+
finally:
303+
if win32api:
304+
try:
305+
win32api.CloseHandle(job)
306+
except Exception:
307+
pass
308+
309+
# Always try to terminate the process itself as well
310+
try:
311+
process.terminate()
312+
except Exception:
313+
pass
203314

204315

205316
@deprecated(

0 commit comments

Comments
 (0)