Skip to content

Commit 3c4ef28

Browse files
committed
Resolve comments
1 parent 0e8456e commit 3c4ef28

File tree

2 files changed

+58
-36
lines changed

2 files changed

+58
-36
lines changed

sdks/python/apache_beam/utils/multi_process_shared.py

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,6 @@ def release_singleton(self, tag, obj):
186186

187187
def unsafe_hard_delete_singleton(self, tag):
188188
self.entries[tag].unsafe_hard_delete()
189-
self._hard_delete_callback()
190189

191190

192191
_process_level_singleton_manager = _SingletonManager()
@@ -236,13 +235,7 @@ def get_auto_proxy_object(self):
236235
return self._proxyObject
237236

238237
def unsafe_hard_delete(self):
239-
try:
240-
self._proxyObject.unsafe_hard_delete()
241-
except (EOFError, ConnectionResetError, BrokenPipeError):
242-
pass
243-
except Exception as e:
244-
logging.warning(
245-
"Exception %s when trying to hard delete shared object proxy", e)
238+
self._proxyObject.unsafe_hard_delete()
246239

247240

248241
def _run_server_process(address_file, tag, constructor, authkey):
@@ -259,8 +252,8 @@ def cleanup_files():
259252
os.remove(address_file)
260253
if os.path.exists(address_file + ".error"):
261254
os.remove(address_file + ".error")
262-
except Exception:
263-
pass
255+
except Exception as e:
256+
logging.warning('Failed to cleanup files for tag %s: %s', tag, e)
264257

265258
def handle_unsafe_hard_delete():
266259
cleanup_files()
@@ -270,6 +263,9 @@ def _monitor_parent():
270263
"""Checks if parent is alive every second."""
271264
while True:
272265
try:
266+
# Sends a check to see if parent_pid is still alive,
267+
# this call will fail with OSError if the parent has died
268+
# and no-op if alive.
273269
os.kill(parent_pid, 0)
274270
except OSError:
275271
logging.warning(
@@ -284,7 +280,6 @@ def _monitor_parent():
284280

285281
try:
286282
t = threading.Thread(target=_monitor_parent, daemon=True)
287-
t.start()
288283

289284
logging.getLogger().setLevel(logging.INFO)
290285
multiprocessing.current_process().authkey = authkey
@@ -298,6 +293,9 @@ def _monitor_parent():
298293
tag,
299294
initialize_eagerly=True,
300295
hard_delete_callback=handle_unsafe_hard_delete)
296+
# Start monitoring parent after initialisation is done to avoid
297+
# potential race conditions.
298+
t.start()
301299

302300
server = serving_manager.get_server()
303301
logging.info(
@@ -422,7 +420,8 @@ def acquire(self):
422420
# Trigger a sweep of zombie processes.
423421
# calling active_children() has the side-effect of joining any finished
424422
# processes, effectively reaping zombies from previous unsafe_hard_deletes.
425-
if self._spawn_process: multiprocessing.active_children()
423+
if self._spawn_process:
424+
multiprocessing.active_children()
426425
return _AutoProxyWrapper(singleton)
427426

428427
def release(self, obj):
@@ -437,15 +436,7 @@ def unsafe_hard_delete(self):
437436
to this object exist, or (b) you are ok with all existing references to
438437
this object throwing strange errors when derefrenced.
439438
"""
440-
try:
441-
self._get_manager().unsafe_hard_delete_singleton(self._tag)
442-
except (EOFError, ConnectionResetError, BrokenPipeError):
443-
pass
444-
except Exception as e:
445-
logging.warning(
446-
"Exception %s when trying to hard delete shared object %s",
447-
e,
448-
self._tag)
439+
self._get_manager().unsafe_hard_delete_singleton(self._tag)
449440

450441
def _create_server(self, address_file):
451442
if self._spawn_process:
@@ -477,8 +468,11 @@ def cleanup_process():
477468
os.remove(address_file)
478469
if os.path.exists(error_file):
479470
os.remove(error_file)
480-
except Exception:
481-
pass
471+
except Exception as e:
472+
logging.warning(
473+
'Failed to cleanup files for tag %s in atexit handler: %s',
474+
self._tag,
475+
e)
482476

483477
atexit.register(cleanup_process)
484478

sdks/python/apache_beam/utils/multi_process_shared_test.py

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,11 @@ def test_unsafe_hard_delete(self):
189189
self.assertEqual(counter1.increment(), 1)
190190
self.assertEqual(counter2.increment(), 2)
191191

192-
multi_process_shared.MultiProcessShared(
193-
Counter, tag='test_unsafe_hard_delete').unsafe_hard_delete()
192+
try:
193+
multi_process_shared.MultiProcessShared(
194+
Counter, tag='test_unsafe_hard_delete').unsafe_hard_delete()
195+
except Exception:
196+
pass
194197

195198
with self.assertRaises(Exception):
196199
counter1.get()
@@ -219,7 +222,10 @@ def test_unsafe_hard_delete_autoproxywrapper(self):
219222
self.assertEqual(counter1.increment(), 1)
220223
self.assertEqual(counter2.increment(), 2)
221224

222-
counter2.unsafe_hard_delete()
225+
try:
226+
counter2.unsafe_hard_delete()
227+
except Exception:
228+
pass
223229

224230
with self.assertRaises(Exception):
225231
counter1.get()
@@ -243,8 +249,11 @@ def test_unsafe_hard_delete_no_op(self):
243249
self.assertEqual(counter1.increment(), 1)
244250
self.assertEqual(counter2.increment(), 2)
245251

246-
multi_process_shared.MultiProcessShared(
247-
Counter, tag='no_tag_to_delete').unsafe_hard_delete()
252+
try:
253+
multi_process_shared.MultiProcessShared(
254+
Counter, tag='no_tag_to_delete').unsafe_hard_delete()
255+
except Exception:
256+
pass
248257

249258
self.assertEqual(counter1.increment(), 3)
250259
self.assertEqual(counter2.increment(), 4)
@@ -298,8 +307,9 @@ def setUp(self):
298307
'main',
299308
'to_delete',
300309
'mix1',
301-
'mix2'
302-
'test_process_exit']:
310+
'mix2',
311+
'test_process_exit',
312+
'thundering_herd_test']:
303313
for ext in ['', '.address', '.address.error']:
304314
try:
305315
os.remove(os.path.join(tempdir, tag + ext))
@@ -326,7 +336,10 @@ def test_proxy_on_proxy(self):
326336
instance = shared1.acquire()
327337
proxy_instance = instance.make_proxy(spawn_process=True)
328338
self.assertEqual(proxy_instance.increment(), 1)
329-
proxy_instance.unsafe_hard_delete()
339+
try:
340+
proxy_instance.unsafe_hard_delete()
341+
except Exception:
342+
pass
330343

331344
proxy_instance2 = instance.make_proxy(tag='proxy_2', spawn_process=True)
332345
self.assertEqual(proxy_instance2.increment(), 1)
@@ -344,7 +357,10 @@ def test_unsafe_hard_delete_autoproxywrapper(self):
344357
self.assertEqual(counter1.increment(), 1)
345358
self.assertEqual(counter2.increment(), 2)
346359

347-
counter2.unsafe_hard_delete()
360+
try:
361+
counter2.unsafe_hard_delete()
362+
except Exception:
363+
pass
348364

349365
with self.assertRaises(Exception):
350366
counter1.get()
@@ -385,7 +401,10 @@ def test_process_exits_on_unsafe_hard_delete(self):
385401

386402
self.assertIsNotNone(
387403
server_process, "Could not find spawned server process")
388-
obj.unsafe_hard_delete()
404+
try:
405+
obj.unsafe_hard_delete()
406+
except Exception:
407+
pass
389408
server_process.join(timeout=5)
390409

391410
self.assertFalse(
@@ -413,7 +432,10 @@ def test_process_exits_on_unsafe_hard_delete_with_manager(self):
413432

414433
self.assertIsNotNone(
415434
server_process, "Could not find spawned server process")
416-
shared.unsafe_hard_delete()
435+
try:
436+
shared.unsafe_hard_delete()
437+
except Exception:
438+
pass
417439
server_process.join(timeout=5)
418440

419441
self.assertFalse(
@@ -434,7 +456,10 @@ def test_zombie_reaping_on_acquire(self):
434456
server_pid = next(
435457
p.pid for p in children if p.is_alive() and p.pid != os.getpid())
436458

437-
obj.unsafe_hard_delete()
459+
try:
460+
obj.unsafe_hard_delete()
461+
except Exception:
462+
pass
438463

439464
try:
440465
os.kill(server_pid, 0)
@@ -458,7 +483,10 @@ def test_zombie_reaping_on_acquire(self):
458483
self.assertFalse(
459484
pid_exists,
460485
f"Old server process {server_pid} was not reaped by acquire() sweep")
461-
shared2.unsafe_hard_delete()
486+
try:
487+
shared2.unsafe_hard_delete()
488+
except Exception:
489+
pass
462490

463491

464492
if __name__ == '__main__':

0 commit comments

Comments
 (0)