From ac645acad844b75d0c9b6a53e92e0d382b42974e Mon Sep 17 00:00:00 2001 From: "Christopher J. Markiewicz" Date: Tue, 6 Nov 2018 18:53:32 -0500 Subject: [PATCH 1/4] FIX: Restore NonDaemonProcess to file-level --- nipype/pipeline/plugins/legacymultiproc.py | 23 ++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/nipype/pipeline/plugins/legacymultiproc.py b/nipype/pipeline/plugins/legacymultiproc.py index dd88a55055..8c47260592 100644 --- a/nipype/pipeline/plugins/legacymultiproc.py +++ b/nipype/pipeline/plugins/legacymultiproc.py @@ -73,22 +73,25 @@ def run_node(node, updatehash, taskid): # Return the result dictionary return result +# Pythons 2.7, 3.4-3.7.0, and 3.7.1 have three different implementations of +# pool.Pool().Process(), but a common interface, so construct a Process and +# subclass its __class__ +class NonDaemonProcess(pool.Pool().Process().__class__): + @property + def daemon(self): + return False + + @daemon.setter + def daemon(self, val): + pass + class NonDaemonPool(pool.Pool): """A process pool with non-daemon processes. """ def Process(self, *args, **kwds): proc = super(NonDaemonPool, self).Process(*args, **kwds) - - class NonDaemonProcess(proc.__class__): - """Monkey-patch process to ensure it is never daemonized""" - @property - def daemon(self): - return False - - @daemon.setter - def daemon(self, val): - pass + # Monkey-patch newly created processes to ensure they are never daemonized proc.__class__ = NonDaemonProcess return proc From cc663528fe097c4e0ab761bcc36a21aa57a5034f Mon Sep 17 00:00:00 2001 From: "Christopher J. Markiewicz" Date: Wed, 7 Nov 2018 08:52:14 -0500 Subject: [PATCH 2/4] FIX: Non-daemon attribute as dynamic mixin --- nipype/pipeline/plugins/legacymultiproc.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nipype/pipeline/plugins/legacymultiproc.py b/nipype/pipeline/plugins/legacymultiproc.py index 8c47260592..a9922ec484 100644 --- a/nipype/pipeline/plugins/legacymultiproc.py +++ b/nipype/pipeline/plugins/legacymultiproc.py @@ -74,9 +74,9 @@ def run_node(node, updatehash, taskid): return result # Pythons 2.7, 3.4-3.7.0, and 3.7.1 have three different implementations of -# pool.Pool().Process(), but a common interface, so construct a Process and -# subclass its __class__ -class NonDaemonProcess(pool.Pool().Process().__class__): +# pool.Pool().Process(), and the type of the result varies based on the default +# multiprocessing context, so we need to dynamically patch the daemon property +class NonDaemonMixin(object): @property def daemon(self): return False @@ -92,7 +92,7 @@ class NonDaemonPool(pool.Pool): def Process(self, *args, **kwds): proc = super(NonDaemonPool, self).Process(*args, **kwds) # Monkey-patch newly created processes to ensure they are never daemonized - proc.__class__ = NonDaemonProcess + proc.__class__ = type('NonDaemonProcess', (NonDaemonMixin, proc.__class__), {}) return proc From 7f4660f53abac9d3360191f31e069835d8cfa333 Mon Sep 17 00:00:00 2001 From: "Christopher J. Markiewicz" Date: Wed, 7 Nov 2018 09:24:20 -0500 Subject: [PATCH 3/4] PY2: String inputs to type() --- nipype/pipeline/plugins/legacymultiproc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/pipeline/plugins/legacymultiproc.py b/nipype/pipeline/plugins/legacymultiproc.py index a9922ec484..75b395de48 100644 --- a/nipype/pipeline/plugins/legacymultiproc.py +++ b/nipype/pipeline/plugins/legacymultiproc.py @@ -92,7 +92,7 @@ class NonDaemonPool(pool.Pool): def Process(self, *args, **kwds): proc = super(NonDaemonPool, self).Process(*args, **kwds) # Monkey-patch newly created processes to ensure they are never daemonized - proc.__class__ = type('NonDaemonProcess', (NonDaemonMixin, proc.__class__), {}) + proc.__class__ = type(str('NonDaemonProcess'), (NonDaemonMixin, proc.__class__), {}) return proc From f1f38456adba726857917b9bcc7fd7d021673251 Mon Sep 17 00:00:00 2001 From: "Christopher J. Markiewicz" Date: Wed, 7 Nov 2018 11:37:58 -0500 Subject: [PATCH 4/4] RF: Comprehensive non-daemon subclassing of contexts --- nipype/pipeline/plugins/legacymultiproc.py | 55 ++++++++++++++++++---- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/nipype/pipeline/plugins/legacymultiproc.py b/nipype/pipeline/plugins/legacymultiproc.py index 75b395de48..bfc1773a92 100644 --- a/nipype/pipeline/plugins/legacymultiproc.py +++ b/nipype/pipeline/plugins/legacymultiproc.py @@ -11,6 +11,7 @@ # Import packages import os +import multiprocessing as mp from multiprocessing import Pool, cpu_count, pool from traceback import format_exception import sys @@ -85,15 +86,53 @@ def daemon(self): def daemon(self, val): pass +try: + from multiprocessing import context + # Exists on all platforms + class NonDaemonSpawnProcess(NonDaemonMixin, context.SpawnProcess): + pass + class NonDaemonSpawnContext(context.SpawnContext): + Process = NonDaemonSpawnProcess + _nondaemon_context_mapper = { + 'spawn': NonDaemonSpawnContext() + } -class NonDaemonPool(pool.Pool): - """A process pool with non-daemon processes. - """ - def Process(self, *args, **kwds): - proc = super(NonDaemonPool, self).Process(*args, **kwds) - # Monkey-patch newly created processes to ensure they are never daemonized - proc.__class__ = type(str('NonDaemonProcess'), (NonDaemonMixin, proc.__class__), {}) - return proc + # POSIX only + try: + class NonDaemonForkProcess(NonDaemonMixin, context.ForkProcess): + pass + class NonDaemonForkContext(context.ForkContext): + Process = NonDaemonForkProcess + _nondaemon_context_mapper['fork'] = NonDaemonForkContext() + except AttributeError: + pass + # POSIX only + try: + class NonDaemonForkServerProcess(NonDaemonMixin, context.ForkServerProcess): + pass + class NonDaemonForkServerContext(context.ForkServerContext): + Process = NonDaemonForkServerProcess + _nondaemon_context_mapper['forkserver'] = NonDaemonForkServerContext() + except AttributeError: + pass + + class NonDaemonPool(pool.Pool): + def __init__(self, processes=None, initializer=None, initargs=(), + maxtasksperchild=None, context=None): + if context is None: + context = mp.get_context() + context = _nondaemon_context_mapper[context._name] + super(NonDaemonPool, self).__init__(processes=processes, + initializer=initializer, + initargs=initargs, + maxtasksperchild=maxtasksperchild, + context=context) + +except ImportError: + class NonDaemonProcess(NonDaemonMixin, mp.Process): + pass + class NonDaemonPool(pool.Pool): + Process = NonDaemonProcess class LegacyMultiProcPlugin(DistributedPluginBase):