Skip to content

Commit 0282abb

Browse files
authored
Allow multiprocessshared to spawn process and delete directly with obj (#37112)
* Allow multiprocessshared to spawn process and delete directly with obj * Remove oom protection * Resolve comments * Rename unsafe_hard_delete for the proxy object to prevent collision * Remove support for proxy on proxy to avoid complexity * Fix import order * Update reap test to be compatiable for windows * Update print to logging * Try to tearDown test in a cleaner way * Try patching atexit call to prevent hanging on window * Try weakref so windows can GC the process * Try GC manually to make sure p is cleaned up * Use a different way to check if parent is alive * Close the pipe atexit as well
1 parent 5018976 commit 0282abb

File tree

2 files changed

+436
-28
lines changed

2 files changed

+436
-28
lines changed

sdks/python/apache_beam/utils/multi_process_shared.py

Lines changed: 215 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@
2222
"""
2323
# pytype: skip-file
2424

25+
import atexit
2526
import logging
2627
import multiprocessing.managers
2728
import os
2829
import tempfile
2930
import threading
31+
import time
32+
import traceback
3033
from typing import Any
3134
from typing import Callable
3235
from typing import Dict
@@ -79,6 +82,10 @@ def singletonProxy_release(self):
7982
assert self._SingletonProxy_valid
8083
self._SingletonProxy_valid = False
8184

85+
def singletonProxy_unsafe_hard_delete(self):
86+
assert self._SingletonProxy_valid
87+
self._SingletonProxy_entry.unsafe_hard_delete()
88+
8289
def __getattr__(self, name):
8390
if not self._SingletonProxy_valid:
8491
raise RuntimeError('Entry was released.')
@@ -105,13 +112,16 @@ def __dir__(self):
105112
dir = self._SingletonProxy_entry.obj.__dir__()
106113
dir.append('singletonProxy_call__')
107114
dir.append('singletonProxy_release')
115+
dir.append('singletonProxy_unsafe_hard_delete')
108116
return dir
109117

110118

111119
class _SingletonEntry:
112120
"""Represents a single, refcounted entry in this process."""
113-
def __init__(self, constructor, initialize_eagerly=True):
121+
def __init__(
122+
self, constructor, initialize_eagerly=True, hard_delete_callback=None):
114123
self.constructor = constructor
124+
self._hard_delete_callback = hard_delete_callback
115125
self.refcount = 0
116126
self.lock = threading.Lock()
117127
if initialize_eagerly:
@@ -141,14 +151,28 @@ def unsafe_hard_delete(self):
141151
if self.initialied:
142152
del self.obj
143153
self.initialied = False
154+
if self._hard_delete_callback:
155+
self._hard_delete_callback()
144156

145157

146158
class _SingletonManager:
147159
entries: Dict[Any, Any] = {}
148160

149-
def register_singleton(self, constructor, tag, initialize_eagerly=True):
161+
def __init__(self):
162+
self._hard_delete_callback = None
163+
164+
def set_hard_delete_callback(self, callback):
165+
self._hard_delete_callback = callback
166+
167+
def register_singleton(
168+
self,
169+
constructor,
170+
tag,
171+
initialize_eagerly=True,
172+
hard_delete_callback=None):
150173
assert tag not in self.entries, tag
151-
self.entries[tag] = _SingletonEntry(constructor, initialize_eagerly)
174+
self.entries[tag] = _SingletonEntry(
175+
constructor, initialize_eagerly, hard_delete_callback)
152176

153177
def has_singleton(self, tag):
154178
return tag in self.entries
@@ -160,7 +184,7 @@ def release_singleton(self, tag, obj):
160184
return self.entries[tag].release(obj)
161185

162186
def unsafe_hard_delete_singleton(self, tag):
163-
return self.entries[tag].unsafe_hard_delete()
187+
self.entries[tag].unsafe_hard_delete()
164188

165189

166190
_process_level_singleton_manager = _SingletonManager()
@@ -203,6 +227,87 @@ def __getattr__(self, name):
203227
def get_auto_proxy_object(self):
204228
return self._proxyObject
205229

230+
def unsafe_hard_delete(self):
231+
self._proxyObject.unsafe_hard_delete()
232+
233+
234+
def _run_server_process(address_file, tag, constructor, authkey, life_line):
235+
"""
236+
Runs in a separate process.
237+
Includes a 'Suicide Pact' monitor: If parent dies, I die.
238+
"""
239+
parent_pid = os.getppid()
240+
241+
def cleanup_files():
242+
logging.info("Server process exiting. Deleting files for %s", tag)
243+
try:
244+
if os.path.exists(address_file):
245+
os.remove(address_file)
246+
if os.path.exists(address_file + ".error"):
247+
os.remove(address_file + ".error")
248+
except Exception as e:
249+
logging.warning('Failed to cleanup files for tag %s: %s', tag, e)
250+
251+
def handle_unsafe_hard_delete():
252+
cleanup_files()
253+
os._exit(0)
254+
255+
def _monitor_parent():
256+
"""Checks if parent is alive every second."""
257+
while True:
258+
try:
259+
# This will break if parent dies.
260+
life_line.recv_bytes()
261+
except (EOFError, OSError, BrokenPipeError):
262+
logging.warning(
263+
"Process %s detected Parent %s died. Self-destructing.",
264+
os.getpid(),
265+
parent_pid)
266+
cleanup_files()
267+
os._exit(0)
268+
time.sleep(0.5)
269+
270+
atexit.register(cleanup_files)
271+
272+
try:
273+
t = threading.Thread(target=_monitor_parent, daemon=True)
274+
275+
logging.getLogger().setLevel(logging.INFO)
276+
multiprocessing.current_process().authkey = authkey
277+
278+
serving_manager = _SingletonRegistrar(
279+
address=('localhost', 0), authkey=authkey)
280+
_process_level_singleton_manager.set_hard_delete_callback(
281+
handle_unsafe_hard_delete)
282+
_process_level_singleton_manager.register_singleton(
283+
constructor,
284+
tag,
285+
initialize_eagerly=True,
286+
hard_delete_callback=handle_unsafe_hard_delete)
287+
# Start monitoring parent after initialisation is done to avoid
288+
# potential race conditions.
289+
t.start()
290+
291+
server = serving_manager.get_server()
292+
logging.info(
293+
'Process %s: Proxy serving %s at %s', os.getpid(), tag, server.address)
294+
295+
with open(address_file + '.tmp', 'w') as fout:
296+
fout.write('%s:%d' % server.address)
297+
os.rename(address_file + '.tmp', address_file)
298+
299+
server.serve_forever()
300+
301+
except Exception:
302+
tb = traceback.format_exc()
303+
try:
304+
with open(address_file + ".error.tmp", 'w') as fout:
305+
fout.write(tb)
306+
os.rename(address_file + ".error.tmp", address_file + ".error")
307+
except Exception:
308+
logging.error("CRITICAL ERROR IN SHARED SERVER:\n%s", tb)
309+
os._exit(1)
310+
206311

207312
class MultiProcessShared(Generic[T]):
208313
"""MultiProcessShared is used to share a single object across processes.
@@ -252,7 +357,8 @@ def __init__(
252357
tag: Any,
253358
*,
254359
path: str = tempfile.gettempdir(),
255-
always_proxy: Optional[bool] = None):
360+
always_proxy: Optional[bool] = None,
361+
spawn_process: bool = False):
256362
self._constructor = constructor
257363
self._tag = tag
258364
self._path = path
@@ -262,6 +368,7 @@ def __init__(
262368
self._rpc_address = None
263369
self._cross_process_lock = fasteners.InterProcessLock(
264370
os.path.join(self._path, self._tag) + '.lock')
371+
self._spawn_process = spawn_process
265372

266373
def _get_manager(self):
267374
if self._manager is None:
@@ -301,6 +408,11 @@ def acquire(self):
301408
# Caveat: They must always agree, as they will be ignored if the object
302409
# is already constructed.
303410
singleton = self._get_manager().acquire_singleton(self._tag)
411+
# Trigger a sweep of zombie processes.
412+
# calling active_children() has the side-effect of joining any finished
413+
# processes, effectively reaping zombies from previous unsafe_hard_deletes.
414+
if self._spawn_process:
415+
multiprocessing.active_children()
304416
return _AutoProxyWrapper(singleton)
305417

306418
def release(self, obj):
@@ -318,22 +430,101 @@ def unsafe_hard_delete(self):
318430
self._get_manager().unsafe_hard_delete_singleton(self._tag)
319431

320432
def _create_server(self, address_file):
321-
# We need to be able to authenticate with both the manager and the process.
322-
self._serving_manager = _SingletonRegistrar(
323-
address=('localhost', 0), authkey=AUTH_KEY)
324-
multiprocessing.current_process().authkey = AUTH_KEY
325-
# Initialize eagerly to avoid acting as the server if there are issues.
326-
# Note, however, that _create_server itself is called lazily.
327-
_process_level_singleton_manager.register_singleton(
328-
self._constructor, self._tag, initialize_eagerly=True)
329-
self._server = self._serving_manager.get_server()
330-
logging.info(
331-
'Starting proxy server at %s for shared %s',
332-
self._server.address,
333-
self._tag)
334-
with open(address_file + '.tmp', 'w') as fout:
335-
fout.write('%s:%d' % self._server.address)
336-
os.rename(address_file + '.tmp', address_file)
337-
t = threading.Thread(target=self._server.serve_forever, daemon=True)
338-
t.start()
339-
logging.info('Done starting server')
433+
if self._spawn_process:
434+
error_file = address_file + ".error"
435+
436+
if os.path.exists(error_file):
437+
try:
438+
os.remove(error_file)
439+
except OSError:
440+
pass
441+
442+
# Create a pipe to connect with child process
443+
# used to clean up child process if parent dies
444+
reader, writer = multiprocessing.Pipe(duplex=False)
445+
self._life_line = writer
446+
447+
ctx = multiprocessing.get_context('spawn')
448+
p = ctx.Process(
449+
target=_run_server_process,
450+
args=(address_file, self._tag, self._constructor, AUTH_KEY, reader),
451+
daemon=False # Must be False for nested proxies
452+
)
453+
p.start()
454+
logging.info("Parent: Waiting for %s to write address file...", self._tag)
455+
456+
def cleanup_process():
457+
if self._life_line:
458+
self._life_line.close()
459+
if p.is_alive():
460+
logging.info(
461+
"Parent: Terminating server process %s for %s", p.pid, self._tag)
462+
p.terminate()
463+
p.join()
464+
try:
465+
if os.path.exists(address_file):
466+
os.remove(address_file)
467+
if os.path.exists(error_file):
468+
os.remove(error_file)
469+
except Exception as e:
470+
logging.warning(
471+
'Failed to cleanup files for tag %s in atexit handler: %s',
472+
self._tag,
473+
e)
474+
475+
atexit.register(cleanup_process)
476+
477+
start_time = time.time()
478+
last_log = start_time
479+
while True:
480+
if os.path.exists(address_file):
481+
break
482+
483+
if os.path.exists(error_file):
484+
with open(error_file, 'r') as f:
485+
error_msg = f.read()
486+
try:
487+
os.remove(error_file)
488+
except OSError:
489+
pass
490+
491+
if p.is_alive(): p.terminate()
492+
raise RuntimeError(f"Shared Server Process crashed:\n{error_msg}")
493+
494+
if not p.is_alive():
495+
exit_code = p.exitcode
496+
raise RuntimeError(
497+
"Shared Server Process died unexpectedly"
498+
f" with exit code {exit_code}")
499+
500+
if time.time() - last_log > 300:
501+
logging.warning(
502+
"Still waiting for %s to initialize... %ss elapsed)",
503+
self._tag,
504+
int(time.time() - start_time))
505+
last_log = time.time()
506+
507+
time.sleep(0.05)
508+
509+
logging.info('External process successfully started for %s', self._tag)
510+
else:
511+
# We need to be able to authenticate with both the manager
512+
# and the process.
513+
self._serving_manager = _SingletonRegistrar(
514+
address=('localhost', 0), authkey=AUTH_KEY)
515+
multiprocessing.current_process().authkey = AUTH_KEY
516+
# Initialize eagerly to avoid acting as the server if there are issues.
517+
# Note, however, that _create_server itself is called lazily.
518+
_process_level_singleton_manager.register_singleton(
519+
self._constructor, self._tag, initialize_eagerly=True)
520+
self._server = self._serving_manager.get_server()
521+
logging.info(
522+
'Starting proxy server at %s for shared %s',
523+
self._server.address,
524+
self._tag)
525+
with open(address_file + '.tmp', 'w') as fout:
526+
fout.write('%s:%d' % self._server.address)
527+
os.rename(address_file + '.tmp', address_file)
528+
t = threading.Thread(target=self._server.serve_forever, daemon=True)
529+
t.start()
530+
logging.info('Done starting server')

0 commit comments

Comments
 (0)