From 060907e1059262b0e21cff651d02a23fa0c35aec Mon Sep 17 00:00:00 2001 From: Adam Bella Date: Mon, 16 Sep 2019 09:00:25 +0200 Subject: [PATCH] fix - uvloop freeze when something other than uvloop invoke process fork and execute non-python process Problem: Uvloop for each loop register `atFork` handler that is called after fork is executed by forked child. This handler works fine when fork was invoked by uvloop. In case fork is invoked by something else (such as external library) uvloop freeze in this handler because: - GIL is acquired inside `atFork` handler -> in case forked child does not contain python runtime `atFork` handler freeze at obtaining GIL - when compiled in debug mode (`make debug`) cython trace calls are inserted inside compiled `atFork` handler -> in case forked child does not contain python runtime `atFork` handler freeze at providing trace call Solution: This fix solve described problems by implementing `atFork` handler as C function so that forked child can call it safely whether or not contains python runtime. --- tests/test_process_spawning.py | 109 +++++++++++++++++++++++++++++++++ uvloop/handles/process.pyx | 3 + uvloop/includes/fork_handler.h | 15 +++++ uvloop/includes/system.pxd | 8 +-- uvloop/loop.pyx | 16 ++--- 5 files changed, 140 insertions(+), 11 deletions(-) create mode 100644 tests/test_process_spawning.py create mode 100644 uvloop/includes/fork_handler.h diff --git a/tests/test_process_spawning.py b/tests/test_process_spawning.py new file mode 100644 index 00000000..64226aca --- /dev/null +++ b/tests/test_process_spawning.py @@ -0,0 +1,109 @@ +import asyncio +import ctypes.util +import logging +from concurrent.futures import ThreadPoolExecutor +from threading import Thread +from unittest import TestCase + +import uvloop + + +class ProcessSpawningTestCollection(TestCase): + + def test_spawning_external_process(self): + """Test spawning external process (using `popen` system call) that + cause loop freeze.""" + + async def run(loop): + event = asyncio.Event(loop=loop) + + dummy_workers = [simulate_loop_activity(loop, event) + for _ in range(5)] + spawn_worker = spawn_external_process(loop, event) + done, pending = await asyncio.wait([spawn_worker] + dummy_workers, + loop=loop) + exceptions = [result.exception() + for result in done if result.exception()] + if exceptions: + raise exceptions[0] + + return True + + async def simulate_loop_activity(loop, done_event): + """Simulate loop activity by busy waiting for event.""" + while True: + try: + await asyncio.wait_for(done_event.wait(), + timeout=0.1, loop=loop) + except asyncio.TimeoutError: + pass + + if done_event.is_set(): + return None + + async def spawn_external_process(loop, event): + executor = ThreadPoolExecutor() + try: + call = loop.run_in_executor(executor, spawn_process) + await asyncio.wait_for(call, loop=loop, timeout=3600) + finally: + event.set() + executor.shutdown(wait=False) + return True + + BUFFER_LENGTH = 1025 + BufferType = ctypes.c_char * (BUFFER_LENGTH - 1) + + def run_echo(popen, fread, pclose): + fd = popen('echo test'.encode('ASCII'), 'r'.encode('ASCII')) + try: + while True: + buffer = BufferType() + data = ctypes.c_void_p(ctypes.addressof(buffer)) + + # -> this call will freeze whole loop in case of bug + read = fread(data, 1, BUFFER_LENGTH, fd) + if not read: + break + except Exception: + logging.getLogger().exception('read error') + raise + finally: + pclose(fd) + + def spawn_process(): + """Spawn external process via `popen` system call.""" + + stdio = ctypes.CDLL(ctypes.util.find_library('c')) + + # popen system call + popen = stdio.popen + popen.argtypes = (ctypes.c_char_p, ctypes.c_char_p) + popen.restype = ctypes.c_void_p + + # pclose system call + pclose = stdio.pclose + pclose.argtypes = (ctypes.c_void_p,) + pclose.restype = ctypes.c_int + + # fread system call + fread = stdio.fread + fread.argtypes = (ctypes.c_void_p, ctypes.c_size_t, + ctypes.c_size_t, ctypes.c_void_p) + fread.restype = ctypes.c_size_t + + for iteration in range(1000): + t = Thread(target=run_echo, + args=(popen, fread, pclose), + daemon=True) + t.start() + t.join(timeout=10.0) + if t.is_alive(): + raise Exception('process freeze detected at {}' + .format(iteration)) + + return True + + loop = uvloop.new_event_loop() + proc = loop.run_until_complete(run(loop)) + self.assertTrue(proc) diff --git a/uvloop/handles/process.pyx b/uvloop/handles/process.pyx index 08ed5b6f..495ec3ad 100644 --- a/uvloop/handles/process.pyx +++ b/uvloop/handles/process.pyx @@ -28,6 +28,7 @@ cdef class UVProcess(UVHandle): global __forking global __forking_loop + global __forkHandler cdef int err @@ -76,6 +77,7 @@ cdef class UVProcess(UVHandle): loop.active_process_handler = self __forking = 1 __forking_loop = loop + __forkHandler = &__get_fork_handler PyOS_BeforeFork() @@ -85,6 +87,7 @@ cdef class UVProcess(UVHandle): __forking = 0 __forking_loop = None + __forkHandler = NULL loop.active_process_handler = None PyOS_AfterFork_Parent() diff --git a/uvloop/includes/fork_handler.h b/uvloop/includes/fork_handler.h new file mode 100644 index 00000000..828d10fd --- /dev/null +++ b/uvloop/includes/fork_handler.h @@ -0,0 +1,15 @@ + +typedef void (*OnForkHandler)(); + +OnForkHandler __forkHandler = NULL; + +/* Auxiliary function to call global fork handler if defined. + +Note: Fork handler needs to be in C (not cython) otherwise it would require +GIL to be present, but some forks can exec non-python processes. +*/ +void handleAtFork() { + if (__forkHandler != NULL) { + __forkHandler(); + } +} diff --git a/uvloop/includes/system.pxd b/uvloop/includes/system.pxd index 337ed97a..94c90a6c 100644 --- a/uvloop/includes/system.pxd +++ b/uvloop/includes/system.pxd @@ -59,12 +59,12 @@ cdef extern from "unistd.h" nogil: void _exit(int status) -cdef extern from "pthread.h" nogil: +cdef extern from "pthread.h": int pthread_atfork( - void (*prepare)() nogil, - void (*parent)() nogil, - void (*child)() nogil) + void (*prepare)(), + void (*parent)(), + void (*child)()) cdef extern from "includes/compat.h" nogil: diff --git a/uvloop/loop.pyx b/uvloop/loop.pyx index ccd06419..824815b7 100644 --- a/uvloop/loop.pyx +++ b/uvloop/loop.pyx @@ -3117,19 +3117,21 @@ cdef vint __atfork_installed = 0 cdef vint __forking = 0 cdef Loop __forking_loop = None +cdef extern from "includes/fork_handler.h": -cdef void __atfork_child() nogil: - # See CPython/posixmodule.c for details + ctypedef void (*OnForkHandler)() + cdef OnForkHandler __forkHandler + void handleAtFork() + +cdef void __get_fork_handler() nogil: global __forking + global __forking_loop with gil: - if (__forking and - __forking_loop is not None and + if (__forking and __forking_loop is not None and __forking_loop.active_process_handler is not None): - __forking_loop.active_process_handler._after_fork() - cdef __install_atfork(): global __atfork_installed if __atfork_installed: @@ -3138,7 +3140,7 @@ cdef __install_atfork(): cdef int err - err = system.pthread_atfork(NULL, NULL, &__atfork_child) + err = system.pthread_atfork(NULL, NULL, &handleAtFork) if err: __atfork_installed = 0 raise convert_error(-err)