Skip to content

Commit 79232b9

Browse files
authored
Fix multiprocessing deadlock after using prefect_test_harness on Linux (#19116)
1 parent c606d53 commit 79232b9

File tree

3 files changed

+143
-0
lines changed

3 files changed

+143
-0
lines changed

src/prefect/_internal/concurrency/services.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
import concurrent.futures
66
import contextlib
77
import logging
8+
import os
89
import queue
910
import threading
11+
import weakref
1012
from collections.abc import AsyncGenerator, Awaitable, Coroutine, Generator, Hashable
1113
from typing import TYPE_CHECKING, Any, Generic, NoReturn, Optional, Union, cast
1214

@@ -22,6 +24,31 @@
2224
Ts = TypeVarTuple("Ts")
2325
R = TypeVar("R", infer_variance=True)
2426

27+
# Track all active services for fork handling
28+
_active_services: weakref.WeakSet[_QueueServiceBase[Any]] = weakref.WeakSet()
29+
30+
31+
def _reset_services_after_fork():
32+
"""
33+
Reset service state after fork() to prevent multiprocessing deadlocks on Linux.
34+
35+
Called by os.register_at_fork() in the child process after fork().
36+
"""
37+
for service in list(_active_services):
38+
service.reset_for_fork()
39+
40+
# Reset the class-level instance tracking
41+
_QueueServiceBase.reset_instances_for_fork()
42+
43+
44+
# Register fork handler if supported (POSIX systems)
45+
if hasattr(os, "register_at_fork"):
46+
try:
47+
os.register_at_fork(after_in_child=_reset_services_after_fork)
48+
except RuntimeError:
49+
# Might fail in certain contexts (e.g., if already in a child process)
50+
pass
51+
2552

2653
class _QueueServiceBase(abc.ABC, Generic[T]):
2754
_instances: dict[int, Self] = {}
@@ -44,6 +71,25 @@ def __init__(self, *args: Hashable) -> None:
4471
)
4572
self._logger = logging.getLogger(f"{type(self).__name__}")
4673

74+
# Track this instance for fork handling
75+
_active_services.add(self)
76+
77+
def reset_for_fork(self) -> None:
78+
"""Reset instance state after fork() to prevent deadlocks in child process."""
79+
self._stopped = True
80+
self._started = False
81+
self._loop = None
82+
self._done_event = None
83+
self._task = None
84+
self._queue = queue.Queue()
85+
self._lock = threading.Lock()
86+
87+
@classmethod
88+
def reset_instances_for_fork(cls) -> None:
89+
"""Reset class-level state after fork() to prevent deadlocks in child process."""
90+
cls._instances.clear()
91+
cls._instance_lock = threading.Lock()
92+
4793
def start(self) -> None:
4894
logger.debug("Starting service %r", self)
4995
loop_thread = get_global_loop()

src/prefect/_internal/concurrency/threads.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
import atexit
99
import concurrent.futures
1010
import itertools
11+
import os
1112
import queue
1213
import threading
14+
import weakref
1315
from typing import Any, Optional
1416

1517
from typing_extensions import TypeVar
@@ -22,6 +24,32 @@
2224

2325
T = TypeVar("T", infer_variance=True)
2426

27+
# Track all active instances for fork handling
28+
_active_instances: weakref.WeakSet[WorkerThread | EventLoopThread] = weakref.WeakSet()
29+
30+
31+
def _reset_after_fork_in_child():
32+
"""
33+
Reset thread state after fork() to prevent multiprocessing deadlocks on Linux.
34+
35+
When fork() is called, the child process inherits all thread state and locks
36+
from the parent, but only the calling thread continues. This leaves other threads'
37+
locks in inconsistent states causing deadlocks.
38+
39+
This handler is called by os.register_at_fork() in the child process after fork().
40+
"""
41+
for instance in list(_active_instances):
42+
instance.reset_for_fork()
43+
44+
45+
# Register fork handler if supported (POSIX systems)
46+
if hasattr(os, "register_at_fork"):
47+
try:
48+
os.register_at_fork(after_in_child=_reset_after_fork_in_child)
49+
except RuntimeError:
50+
# Might fail in certain contexts (e.g., if already in a child process)
51+
pass
52+
2553

2654
class WorkerThread(Portal):
2755
"""
@@ -45,9 +73,19 @@ def __init__(
4573
self._submitted_count: int = 0
4674
self._lock = threading.Lock()
4775

76+
# Track this instance for fork handling
77+
_active_instances.add(self)
78+
4879
if not daemon:
4980
atexit.register(self.shutdown)
5081

82+
def reset_for_fork(self) -> None:
83+
"""Reset state after fork() to prevent deadlocks in child process."""
84+
self._started = False
85+
self._queue = queue.Queue()
86+
self._lock = threading.Lock()
87+
self._submitted_count = 0
88+
5189
def start(self) -> None:
5290
"""
5391
Start the worker thread.
@@ -145,9 +183,21 @@ def __init__(
145183
self._on_shutdown: list[Call[Any]] = []
146184
self._lock = threading.Lock()
147185

186+
# Track this instance for fork handling
187+
_active_instances.add(self)
188+
148189
if not daemon:
149190
atexit.register(self.shutdown)
150191

192+
def reset_for_fork(self) -> None:
193+
"""Reset state after fork() to prevent deadlocks in child process."""
194+
self._loop = None
195+
self._ready_future = concurrent.futures.Future()
196+
self._shutdown_event = Event()
197+
self._lock = threading.Lock()
198+
self._submitted_count = 0
199+
self._on_shutdown = []
200+
151201
def start(self):
152202
"""
153203
Start the worker thread; raises any exceptions encountered during startup.

tests/testing/test_utilites.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import sys
12
import uuid
23
import warnings
34
from unittest.mock import MagicMock
@@ -15,6 +16,20 @@
1516
from prefect.testing.utilities import assert_does_not_warn, prefect_test_harness
1617

1718

19+
def _multiprocessing_worker():
20+
"""
21+
Worker function for multiprocessing test. Must be at module level for pickling.
22+
"""
23+
import os
24+
25+
# os._exit() is required here despite the underscore prefix. On Linux with fork(),
26+
# the child process inherits Prefect's logging/event state. Normal exit (return or
27+
# sys.exit) triggers Python cleanup that fails with this inherited state, causing
28+
# exitcode=1. os._exit() bypasses cleanup and is documented for use "in the child
29+
# process after os.fork()" - which is exactly this scenario.
30+
os._exit(0)
31+
32+
1833
def test_assert_does_not_warn_no_warning():
1934
with assert_does_not_warn():
2035
pass
@@ -101,3 +116,35 @@ def test_prefect_test_harness_timeout(monkeypatch):
101116
server().start.assert_called_once_with(
102117
timeout=PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS.value()
103118
)
119+
120+
121+
@pytest.mark.skipif(sys.platform == "win32", reason="fork() not available on Windows")
122+
def test_multiprocessing_after_test_harness():
123+
"""
124+
Test that multiprocessing works after using prefect_test_harness.
125+
126+
Regression test for issue #19112 - on Linux, multiprocessing.Process() would
127+
deadlock after using the test harness because fork() inherited locked thread
128+
state from background threads.
129+
"""
130+
import multiprocessing
131+
132+
@task
133+
def test_task():
134+
return 1
135+
136+
@flow
137+
def test_flow():
138+
return test_task.submit()
139+
140+
# Use test harness which starts background threads
141+
with prefect_test_harness():
142+
test_flow()
143+
144+
# This should not deadlock
145+
process = multiprocessing.Process(target=_multiprocessing_worker)
146+
process.start()
147+
process.join(timeout=5)
148+
149+
# Verify process completed successfully
150+
assert process.exitcode == 0, "Process should complete without deadlock"

0 commit comments

Comments
 (0)