Skip to content

Commit

Permalink
properly close OutStream and various fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
limwz01 committed Dec 12, 2024
1 parent 188f39c commit 32cfde7
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 14 deletions.
1 change: 1 addition & 0 deletions ipykernel/embed.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ def embed_kernel(module=None, local_ns=None, **kwargs):
app.kernel.user_ns = local_ns
app.shell.set_completer_frame() # type:ignore[union-attr]
app.start()
app.close()
4 changes: 2 additions & 2 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,8 @@ def close(self):
self._should_watch = False
# thread won't wake unless there's something to read
# writing something after _should_watch will not be echoed
os.write(self._original_stdstream_fd, b"\0")
if self.watch_fd_thread is not None:
if self.watch_fd_thread is not None and self.watch_fd_thread.is_alive():
os.write(self._original_stdstream_fd, b"\0")
self.watch_fd_thread.join()
# restore original FDs
os.dup2(self._original_stdstream_copy, self._original_stdstream_fd)
Expand Down
65 changes: 53 additions & 12 deletions ipykernel/kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,9 @@ def init_heartbeat(self):
self.heartbeat.start()

def close(self):
if self.closed:
return
self.closed = True
"""Close zmq sockets in an orderly fashion"""
# un-capture IO before we start closing channels
self.reset_io()
Expand Down Expand Up @@ -471,33 +474,45 @@ def log_connection_info(self):
def init_blackhole(self):
"""redirects stdout/stderr to devnull if necessary"""
if self.no_stdout or self.no_stderr:
blackhole = open(os.devnull, "w") # noqa: SIM115
# keep reference around so that it would not accidentally close the pipe fds
self._blackhole = open(os.devnull, "w") # noqa: SIM115
if self.no_stdout:
sys.stdout = sys.__stdout__ = blackhole # type:ignore[misc]
if sys.stdout is not None:
sys.stdout.flush()
sys.stdout = self._blackhole # type:ignore[misc]
if self.no_stderr:
sys.stderr = sys.__stderr__ = blackhole # type:ignore[misc]
if sys.stderr is not None:
sys.stderr.flush()
sys.stderr = self._blackhole # type:ignore[misc]

def init_io(self):
"""Redirect input streams and set a display hook."""
if self.outstream_class:
outstream_factory = import_item(str(self.outstream_class))
if sys.stdout is not None:
sys.stdout.flush()

e_stdout = None if self.quiet else sys.__stdout__
e_stderr = None if self.quiet else sys.__stderr__
e_stdout = None if self.quiet else sys.stdout
e_stderr = None if self.quiet else sys.stderr

if not self.capture_fd_output:
outstream_factory = partial(outstream_factory, watchfd=False)

if sys.stdout is not None:
sys.stdout.flush()
sys.stdout = outstream_factory(self.session, self.iopub_thread, "stdout", echo=e_stdout)

if sys.stderr is not None:
sys.stderr.flush()
sys.stderr = outstream_factory(self.session, self.iopub_thread, "stderr", echo=e_stderr)

if hasattr(sys.stderr, "_original_stdstream_copy"):
for handler in self.log.handlers:
if isinstance(handler, StreamHandler) and (handler.stream.buffer.fileno() == 2):
self.log.debug("Seeing logger to stderr, rerouting to raw filedescriptor.")
if (isinstance(handler, StreamHandler)
and (buffer := getattr(handler.stream, "buffer"))
and (fileno := getattr(buffer, "fileno"))
and fileno() == sys.stderr._original_stdstream_fd):
self.log.debug(
"Seeing logger to stderr, rerouting to raw filedescriptor."
)

handler.stream = TextIOWrapper(
FileIO(
Expand All @@ -517,9 +532,32 @@ def reset_io(self):
restores state after init_io
"""
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
sys.displayhook = sys.__displayhook__
stdout, stderr, displayhook = sys.stdout, sys.stderr, sys.displayhook
sys.stdout, sys.stderr, sys.displayhook = self._original_io
if (finish_displayhook := getattr(displayhook, "finish_displayhook",
None)):
finish_displayhook()
if hasattr(sys.stderr, "_original_stdstream_copy"):
for handler in self.log.handlers:
if (isinstance(handler, StreamHandler)
and (buffer := getattr(handler.stream, "buffer"))
and (fileno := getattr(buffer, "fileno"))
and fileno() == sys.stderr._original_stdstream_copy):
self.log.debug(
"Seeing logger to raw filedescriptor, rerouting back to stderr"
)

handler.stream = TextIOWrapper(
FileIO(
sys.stderr._original_stdstream_fd,
"w",
)
)
stderr.close()
stdout.close()
if self._blackhole:
# already closed by above but no harm calling again
self._blackhole.close()

def patch_io(self):
"""Patch important libraries that can't handle sys.stdout forwarding"""
Expand Down Expand Up @@ -693,6 +731,9 @@ def init_pdb(self):
@catch_config_error
def initialize(self, argv=None):
"""Initialize the application."""
self.closed = False
self._blackhole = None
self._original_io = sys.stdout, sys.stderr, sys.displayhook
self._init_asyncio_patch()
super().initialize(argv)
if self.subapp is not None:
Expand Down

0 comments on commit 32cfde7

Please sign in to comment.