Skip to content

Fix - freeze when reading stdout from external process #268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions tests/test_process_spawning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import asyncio
import ctypes.util
import logging
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
from unittest import TestCase

import uvloop


class ProcessSpawningTestCollection(TestCase):

def test_spawning_external_process(self):
"""Test spawning external process (using `popen` system call) that
cause loop freeze."""

async def run(loop):
event = asyncio.Event(loop=loop)

dummy_workers = [simulate_loop_activity(loop, event)
for _ in range(5)]
spawn_worker = spawn_external_process(loop, event)
done, pending = await asyncio.wait([spawn_worker] + dummy_workers,
loop=loop)
exceptions = [result.exception()
for result in done if result.exception()]
if exceptions:
raise exceptions[0]

return True

async def simulate_loop_activity(loop, done_event):
"""Simulate loop activity by busy waiting for event."""
while True:
try:
await asyncio.wait_for(done_event.wait(),
timeout=0.1, loop=loop)
except asyncio.TimeoutError:
pass

if done_event.is_set():
return None

async def spawn_external_process(loop, event):
executor = ThreadPoolExecutor()
try:
call = loop.run_in_executor(executor, spawn_process)
await asyncio.wait_for(call, loop=loop, timeout=3600)
finally:
event.set()
executor.shutdown(wait=False)
return True

BUFFER_LENGTH = 1025
BufferType = ctypes.c_char * (BUFFER_LENGTH - 1)

def run_echo(popen, fread, pclose):
fd = popen('echo test'.encode('ASCII'), 'r'.encode('ASCII'))
try:
while True:
buffer = BufferType()
data = ctypes.c_void_p(ctypes.addressof(buffer))

# -> this call will freeze whole loop in case of bug
read = fread(data, 1, BUFFER_LENGTH, fd)
if not read:
break
except Exception:
logging.getLogger().exception('read error')
raise
finally:
pclose(fd)

def spawn_process():
"""Spawn external process via `popen` system call."""

stdio = ctypes.CDLL(ctypes.util.find_library('c'))

# popen system call
popen = stdio.popen
popen.argtypes = (ctypes.c_char_p, ctypes.c_char_p)
popen.restype = ctypes.c_void_p

# pclose system call
pclose = stdio.pclose
pclose.argtypes = (ctypes.c_void_p,)
pclose.restype = ctypes.c_int

# fread system call
fread = stdio.fread
fread.argtypes = (ctypes.c_void_p, ctypes.c_size_t,
ctypes.c_size_t, ctypes.c_void_p)
fread.restype = ctypes.c_size_t

for iteration in range(1000):
t = Thread(target=run_echo,
args=(popen, fread, pclose),
daemon=True)
t.start()
t.join(timeout=10.0)
if t.is_alive():
raise Exception('process freeze detected at {}'
.format(iteration))

return True

loop = uvloop.new_event_loop()
proc = loop.run_until_complete(run(loop))
self.assertTrue(proc)
3 changes: 3 additions & 0 deletions uvloop/handles/process.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ cdef class UVProcess(UVHandle):

global __forking
global __forking_loop
global __forkHandler

cdef int err

Expand Down Expand Up @@ -76,6 +77,7 @@ cdef class UVProcess(UVHandle):
loop.active_process_handler = self
__forking = 1
__forking_loop = loop
__forkHandler = <OnForkHandler>&__get_fork_handler

PyOS_BeforeFork()

Expand All @@ -85,6 +87,7 @@ cdef class UVProcess(UVHandle):

__forking = 0
__forking_loop = None
__forkHandler = NULL
loop.active_process_handler = None

PyOS_AfterFork_Parent()
Expand Down
15 changes: 15 additions & 0 deletions uvloop/includes/fork_handler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

typedef void (*OnForkHandler)();

OnForkHandler __forkHandler = NULL;

/* Auxiliary function to call global fork handler if defined.

Note: Fork handler needs to be in C (not cython) otherwise it would require
GIL to be present, but some forks can exec non-python processes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow... The old handler had a nogil decoration, i.e. cdef void __atfork_child() nogil:. How's this indirection makes that different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is small but significant difference in case you compile it with cython and nogil traces enabled (-DCYTHON_TRACE=1 -DCYTHON_TRACE_NOGIL=1).
In this mode cython will insert trace calls during compilation and resulting compiled __atfork_child function will contain __Pyx_TraceCall. It is not a problem when fork contains python runtime, but it will freeze at this trace call in case forking child does not contains python runtime.

Traces are enabled in make debug, so I decide to write it in pure C where no such traces are inserted by cython.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so that I understand the scope: this PR is only needed to unbreak make debug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, on the contrary, this PR should fix freezing uvloop (under some conditions) and freezing occurs even when compiled with pure make (without debug), but for a different reason.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I'll review the PR soon.

In the meantime, could you please squash all commits into 1 and write a commit message that explains the bug/problem in a detailed way? It would help me / future maintainers a lot.

Huge props for coming up with a functional test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I squash it. Give me a note if there is something not clear enough.

*/
void handleAtFork() {
if (__forkHandler != NULL) {
__forkHandler();
}
}
8 changes: 4 additions & 4 deletions uvloop/includes/system.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ cdef extern from "unistd.h" nogil:
void _exit(int status)


cdef extern from "pthread.h" nogil:
cdef extern from "pthread.h":

int pthread_atfork(
void (*prepare)() nogil,
void (*parent)() nogil,
void (*child)() nogil)
void (*prepare)(),
void (*parent)(),
void (*child)())


cdef extern from "includes/compat.h" nogil:
Expand Down
16 changes: 9 additions & 7 deletions uvloop/loop.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3117,19 +3117,21 @@ cdef vint __atfork_installed = 0
cdef vint __forking = 0
cdef Loop __forking_loop = None

cdef extern from "includes/fork_handler.h":

cdef void __atfork_child() nogil:
# See CPython/posixmodule.c for details
ctypedef void (*OnForkHandler)()
cdef OnForkHandler __forkHandler
void handleAtFork()

cdef void __get_fork_handler() nogil:
global __forking
global __forking_loop

with gil:
if (__forking and
__forking_loop is not None and
if (__forking and __forking_loop is not None and
__forking_loop.active_process_handler is not None):

__forking_loop.active_process_handler._after_fork()


cdef __install_atfork():
global __atfork_installed
if __atfork_installed:
Expand All @@ -3138,7 +3140,7 @@ cdef __install_atfork():

cdef int err

err = system.pthread_atfork(NULL, NULL, &__atfork_child)
err = system.pthread_atfork(NULL, NULL, &handleAtFork)
if err:
__atfork_installed = 0
raise convert_error(-err)
Expand Down