Skip to content

Commit 3d3499a

Browse files
committed
Warn when multiprocessing start method is 'fork'
CUDA does not support the fork() system call. Forked subprocesses exhibit undefined behavior, including failure to initialize CUDA contexts and devices. Add warning checks in multiprocessing reduction functions for IPC objects (DeviceMemoryResource, IPCAllocationHandle, Event) that warn when the start method is 'fork'. The warning is emitted once per process when IPC objects are serialized. Fixes #1136
1 parent 42300f3 commit 3d3499a

File tree

4 files changed

+292
-0
lines changed

4 files changed

+292
-0
lines changed

cuda_core/cuda/core/experimental/_event.pyx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ from typing import TYPE_CHECKING, Optional
2121
from cuda.core.experimental._context import Context
2222
from cuda.core.experimental._utils.cuda_utils import (
2323
CUDAError,
24+
_check_multiprocessing_start_method,
2425
driver,
2526
)
2627
if TYPE_CHECKING:
@@ -300,6 +301,7 @@ cdef class IPCEventDescriptor:
300301

301302

302303
def _reduce_event(event):
304+
_check_multiprocessing_start_method()
303305
return event.from_ipc_descriptor, (event.get_ipc_descriptor(),)
304306

305307
multiprocessing.reduction.register(Event, _reduce_event)

cuda_core/cuda/core/experimental/_memory/_ipc.pyx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ from cuda.bindings cimport cydriver
1010
from cuda.core.experimental._memory._buffer cimport Buffer
1111
from cuda.core.experimental._stream cimport default_stream
1212
from cuda.core.experimental._utils.cuda_utils cimport HANDLE_RETURN
13+
from cuda.core.experimental._utils.cuda_utils import _check_multiprocessing_start_method
1314

1415
import multiprocessing
1516
import os
@@ -129,6 +130,7 @@ cdef class IPCAllocationHandle:
129130

130131

131132
def _reduce_allocation_handle(alloc_handle):
133+
_check_multiprocessing_start_method()
132134
df = multiprocessing.reduction.DupFd(alloc_handle.handle)
133135
return _reconstruct_allocation_handle, (type(alloc_handle), df, alloc_handle.uuid)
134136

@@ -141,6 +143,7 @@ multiprocessing.reduction.register(IPCAllocationHandle, _reduce_allocation_handl
141143

142144

143145
def _deep_reduce_device_memory_resource(mr):
146+
_check_multiprocessing_start_method()
144147
from .._device import Device
145148
device = Device(mr.device_id)
146149
alloc_handle = mr.get_allocation_handle()

cuda_core/cuda/core/experimental/_utils/cuda_utils.pyx

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
import functools
66
from functools import partial
77
import importlib.metadata
8+
import multiprocessing
9+
import platform
10+
import warnings
811
from collections import namedtuple
912
from collections.abc import Sequence
1013
from contextlib import ExitStack
@@ -283,3 +286,39 @@ class Transaction:
283286
"""
284287
# pop_all() empties this stack so no callbacks are triggered on exit.
285288
self._stack.pop_all()
289+
290+
291+
# Track whether we've already warned about fork method
292+
_fork_warning_emitted = False
293+
294+
295+
def _check_multiprocessing_start_method():
296+
"""Check if multiprocessing start method is 'fork' and warn if so."""
297+
global _fork_warning_emitted
298+
if _fork_warning_emitted:
299+
return
300+
301+
# Common warning message parts
302+
common_message = (
303+
"CUDA does not support. Forked subprocesses exhibit undefined behavior, "
304+
"including failure to initialize CUDA contexts and devices. Set the start method "
305+
"to 'spawn' before creating processes that use CUDA. "
306+
"Use: multiprocessing.set_start_method('spawn')"
307+
)
308+
309+
try:
310+
start_method = multiprocessing.get_start_method()
311+
if start_method == "fork":
312+
message = f"multiprocessing start method is 'fork', which {common_message}"
313+
warnings.warn(message, UserWarning, stacklevel=3)
314+
_fork_warning_emitted = True
315+
except RuntimeError:
316+
# get_start_method() can raise RuntimeError if start method hasn't been set
317+
# In this case, default is 'fork' on Linux, so we should warn
318+
if platform.system() == "Linux":
319+
message = (
320+
f"multiprocessing start method is not set and defaults to 'fork' on Linux, "
321+
f"which {common_message}"
322+
)
323+
warnings.warn(message, UserWarning, stacklevel=3)
324+
_fork_warning_emitted = True
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""
5+
Test that warnings are emitted when multiprocessing start method is 'fork'
6+
and IPC objects are serialized.
7+
8+
These tests run in subprocesses to avoid the conftest.py session fixture that
9+
sets the start method to 'spawn'.
10+
"""
11+
12+
import subprocess
13+
import sys
14+
import textwrap
15+
16+
17+
def test_warn_on_fork_method_device_memory_resource():
18+
"""Test that warning is emitted when DeviceMemoryResource is pickled with fork method."""
19+
script = textwrap.dedent("""
20+
import multiprocessing
21+
import sys
22+
import warnings
23+
24+
# Set start method to 'fork' before importing cuda.core
25+
multiprocessing.set_start_method('fork', force=True)
26+
27+
# Capture warnings
28+
with warnings.catch_warnings(record=True) as w:
29+
warnings.simplefilter("always")
30+
31+
from cuda.core.experimental import Device, DeviceMemoryResource, DeviceMemoryResourceOptions
32+
33+
# Create a DeviceMemoryResource
34+
device = Device(0)
35+
device.set_current()
36+
options = DeviceMemoryResourceOptions(max_size=2097152, ipc_enabled=True)
37+
mr = DeviceMemoryResource(device, options=options)
38+
39+
# Trigger reduction by pickling with ForkingPickler (triggers multiprocessing reduction)
40+
from multiprocessing.reduction import ForkingPickler
41+
ForkingPickler.dumps(mr)
42+
43+
# Check that warning was emitted
44+
if len(w) == 0:
45+
print("ERROR: No warning emitted", file=sys.stderr)
46+
sys.exit(1)
47+
48+
warning = w[0]
49+
if warning.category != UserWarning:
50+
print(f"ERROR: Expected UserWarning, got {warning.category}", file=sys.stderr)
51+
sys.exit(1)
52+
53+
if "fork" not in str(warning.message).lower():
54+
print(f"ERROR: Warning message doesn't mention 'fork': {warning.message}", file=sys.stderr)
55+
sys.exit(1)
56+
57+
if "spawn" not in str(warning.message).lower():
58+
print(f"ERROR: Warning message doesn't mention 'spawn': {warning.message}", file=sys.stderr)
59+
sys.exit(1)
60+
61+
if "undefined behavior" not in str(warning.message).lower():
62+
msg = f"ERROR: Warning message doesn't mention 'undefined behavior': {warning.message}"
63+
print(msg, file=sys.stderr)
64+
sys.exit(1)
65+
66+
print("SUCCESS: Warning emitted correctly")
67+
""")
68+
69+
result = subprocess.run([sys.executable, "-c", script], capture_output=True, text=True, timeout=30) # noqa: S603
70+
71+
assert result.returncode == 0, f"Subprocess failed:\nstdout: {result.stdout}\nstderr: {result.stderr}"
72+
assert "SUCCESS" in result.stdout
73+
74+
75+
def test_warn_on_fork_method_allocation_handle():
76+
"""Test that warning is emitted when IPCAllocationHandle is pickled with fork method."""
77+
script = textwrap.dedent("""
78+
import multiprocessing
79+
import sys
80+
import warnings
81+
82+
# Set start method to 'fork' before importing cuda.core
83+
multiprocessing.set_start_method('fork', force=True)
84+
85+
# Capture warnings
86+
with warnings.catch_warnings(record=True) as w:
87+
warnings.simplefilter("always")
88+
89+
from cuda.core.experimental import Device, DeviceMemoryResource, DeviceMemoryResourceOptions
90+
91+
# Create a DeviceMemoryResource and get its allocation handle
92+
device = Device(0)
93+
device.set_current()
94+
options = DeviceMemoryResourceOptions(max_size=2097152, ipc_enabled=True)
95+
mr = DeviceMemoryResource(device, options=options)
96+
alloc_handle = mr.get_allocation_handle()
97+
98+
# Trigger reduction by pickling with ForkingPickler (triggers multiprocessing reduction)
99+
from multiprocessing.reduction import ForkingPickler
100+
ForkingPickler.dumps(alloc_handle)
101+
102+
# Check that warning was emitted
103+
if len(w) == 0:
104+
print("ERROR: No warning emitted", file=sys.stderr)
105+
sys.exit(1)
106+
107+
warning = w[0]
108+
if warning.category != UserWarning:
109+
print(f"ERROR: Expected UserWarning, got {warning.category}", file=sys.stderr)
110+
sys.exit(1)
111+
112+
print("SUCCESS: Warning emitted correctly")
113+
""")
114+
115+
result = subprocess.run([sys.executable, "-c", script], capture_output=True, text=True, timeout=30) # noqa: S603
116+
117+
assert result.returncode == 0, f"Subprocess failed:\nstdout: {result.stdout}\nstderr: {result.stderr}"
118+
assert "SUCCESS" in result.stdout
119+
120+
121+
def test_warn_on_fork_method_event():
122+
"""Test that warning is emitted when Event is pickled with fork method."""
123+
script = textwrap.dedent("""
124+
import multiprocessing
125+
import sys
126+
import warnings
127+
128+
# Set start method to 'fork' before importing cuda.core
129+
multiprocessing.set_start_method('fork', force=True)
130+
131+
# Capture warnings
132+
with warnings.catch_warnings(record=True) as w:
133+
warnings.simplefilter("always")
134+
135+
from cuda.core.experimental import Device, EventOptions
136+
137+
# Create an Event via stream.record()
138+
device = Device(0)
139+
device.set_current()
140+
stream = device.create_stream()
141+
ipc_event_options = EventOptions(ipc_enabled=True)
142+
event = stream.record(options=ipc_event_options)
143+
144+
# Trigger reduction by pickling with ForkingPickler (triggers multiprocessing reduction)
145+
from multiprocessing.reduction import ForkingPickler
146+
ForkingPickler.dumps(event)
147+
148+
# Check that warning was emitted
149+
if len(w) == 0:
150+
print("ERROR: No warning emitted", file=sys.stderr)
151+
sys.exit(1)
152+
153+
warning = w[0]
154+
if warning.category != UserWarning:
155+
print(f"ERROR: Expected UserWarning, got {warning.category}", file=sys.stderr)
156+
sys.exit(1)
157+
158+
print("SUCCESS: Warning emitted correctly")
159+
""")
160+
161+
result = subprocess.run([sys.executable, "-c", script], capture_output=True, text=True, timeout=30) # noqa: S603
162+
163+
assert result.returncode == 0, f"Subprocess failed:\nstdout: {result.stdout}\nstderr: {result.stderr}"
164+
assert "SUCCESS" in result.stdout
165+
166+
167+
def test_no_warning_with_spawn_method():
168+
"""Test that no warning is emitted when start method is 'spawn'."""
169+
script = textwrap.dedent("""
170+
import multiprocessing
171+
import sys
172+
import warnings
173+
174+
# Set start method to 'spawn' before importing cuda.core
175+
multiprocessing.set_start_method('spawn', force=True)
176+
177+
# Capture warnings
178+
with warnings.catch_warnings(record=True) as w:
179+
warnings.simplefilter("always")
180+
181+
from cuda.core.experimental import Device, DeviceMemoryResource, DeviceMemoryResourceOptions
182+
183+
# Create a DeviceMemoryResource
184+
device = Device(0)
185+
device.set_current()
186+
options = DeviceMemoryResourceOptions(max_size=2097152, ipc_enabled=True)
187+
mr = DeviceMemoryResource(device, options=options)
188+
189+
# Trigger reduction by pickling with ForkingPickler (triggers multiprocessing reduction)
190+
from multiprocessing.reduction import ForkingPickler
191+
ForkingPickler.dumps(mr)
192+
193+
# Check that no warning was emitted
194+
fork_warnings = [warning for warning in w if "fork" in str(warning.message).lower()]
195+
if len(fork_warnings) > 0:
196+
print(f"ERROR: Unexpected warning emitted: {fork_warnings[0].message}", file=sys.stderr)
197+
sys.exit(1)
198+
199+
print("SUCCESS: No warning emitted with spawn method")
200+
""")
201+
202+
result = subprocess.run([sys.executable, "-c", script], capture_output=True, text=True, timeout=30) # noqa: S603
203+
204+
assert result.returncode == 0, f"Subprocess failed:\nstdout: {result.stdout}\nstderr: {result.stderr}"
205+
assert "SUCCESS" in result.stdout
206+
207+
208+
def test_warning_emitted_only_once():
209+
"""Test that warning is only emitted once even when multiple objects are pickled."""
210+
script = textwrap.dedent("""
211+
import multiprocessing
212+
import sys
213+
import warnings
214+
215+
# Set start method to 'fork' before importing cuda.core
216+
multiprocessing.set_start_method('fork', force=True)
217+
218+
# Capture warnings
219+
with warnings.catch_warnings(record=True) as w:
220+
warnings.simplefilter("always")
221+
222+
from cuda.core.experimental import Device, DeviceMemoryResource, DeviceMemoryResourceOptions
223+
224+
# Create multiple DeviceMemoryResources
225+
device = Device(0)
226+
device.set_current()
227+
options = DeviceMemoryResourceOptions(max_size=2097152, ipc_enabled=True)
228+
mr1 = DeviceMemoryResource(device, options=options)
229+
mr2 = DeviceMemoryResource(device, options=options)
230+
231+
# Trigger reduction by pickling multiple times with ForkingPickler
232+
from multiprocessing.reduction import ForkingPickler
233+
ForkingPickler.dumps(mr1)
234+
ForkingPickler.dumps(mr2)
235+
236+
# Check that warning was emitted only once
237+
fork_warnings = [warning for warning in w if "fork" in str(warning.message).lower()]
238+
if len(fork_warnings) != 1:
239+
print(f"ERROR: Expected 1 warning, got {len(fork_warnings)}", file=sys.stderr)
240+
sys.exit(1)
241+
242+
print("SUCCESS: Warning emitted only once")
243+
""")
244+
245+
result = subprocess.run([sys.executable, "-c", script], capture_output=True, text=True, timeout=30) # noqa: S603
246+
247+
assert result.returncode == 0, f"Subprocess failed:\nstdout: {result.stdout}\nstderr: {result.stderr}"
248+
assert "SUCCESS" in result.stdout

0 commit comments

Comments
 (0)