diff --git a/tests/test_in_out.py b/tests/test_in_out.py index 717a015..f9c7efd 100644 --- a/tests/test_in_out.py +++ b/tests/test_in_out.py @@ -120,7 +120,13 @@ def test_hide_show(monkeypatch, capsys, text, request, isatty_fixture): # Ensure that sp.stop() will be executed def teardown(): - sp.stop() + try: + sp.stop() + except ValueError: + # Raised .stop() fails due to an already closed file + # Since this is a teardown function, it seems inappropriate to allow this + # to cause the test to fail + pass request.addfinalizer(teardown) diff --git a/yaspin/core.py b/yaspin/core.py index 798b5be..ec031c7 100644 --- a/yaspin/core.py +++ b/yaspin/core.py @@ -28,6 +28,7 @@ Iterator, Optional, Sequence, + TextIO, Type, TypeVar, Union, @@ -89,8 +90,8 @@ def fancy_handler(signum: int, frame: Any, spinner: Yaspin) -> None: # pylint: class Yaspin: # pylint: disable=too-many-instance-attributes """Implements a context manager that spawns a thread - to write spinner frames into a tty (stdout) during - context execution. + to write spinner frames into a tty (stdout by default) + during context execution. """ # When Python finds its output attached to a terminal, @@ -108,13 +109,17 @@ def __init__( # pylint: disable=too-many-arguments side: str = "left", sigmap: Optional[dict[signal.Signals, SignalHandlers]] = None, timer: bool = False, + stream: Optional[TextIO] = None, ) -> None: + # Stream + self._stream = stream or sys.stdout + self._stream_lock = threading.Lock() + # Spinner self._spinner = self._set_spinner(spinner) self._frames = self._set_frames(self._spinner, reversal) self._interval = self._set_interval(self._spinner) self._cycle = self._set_cycle(self._frames) - # Color Specification self._color = self._set_color(color) if color else color self._on_color = self._set_on_color(on_color) if on_color else on_color @@ -134,7 +139,6 @@ def __init__( # pylint: disable=too-many-arguments self._hide_spin: Optional[threading.Event] = None self._spin_thread: Optional[threading.Thread] = None self._last_frame: Optional[str] = None - self._stdout_lock = threading.Lock() self._hidden_level = 0 self._cur_line_len = 0 @@ -316,14 +320,14 @@ def hide(self) -> None: raise RuntimeError("hide_spin is None") if thr_is_alive and not self._hide_spin.is_set(): - with self._stdout_lock: + with self._stream_lock: # set the hidden spinner flag self._hide_spin.set() self._clear_line() - # flush the stdout buffer so the current line + # flush the stream buffer so the current line # can be rewritten to - sys.stdout.flush() + self._stream.flush() @contextmanager def hidden(self) -> Generator[None, None, None]: @@ -345,7 +349,7 @@ def show(self) -> None: raise RuntimeError("hide_spin is None") if thr_is_alive and self._hide_spin.is_set(): - with self._stdout_lock: + with self._stream_lock: # clear the hidden spinner flag self._hide_spin.clear() # clear the current line so the spinner is not appended to it @@ -355,13 +359,13 @@ def write(self, text: str) -> None: """Write text in the terminal without breaking the spinner.""" # similar to tqdm.write() # https://pypi.python.org/pypi/tqdm#writing-messages - with self._stdout_lock: + with self._stream_lock: self._clear_line() if isinstance(text, (str, bytes)): _text = to_unicode(text) else: _text = str(text) - sys.stdout.write(f"{_text}\n") + self._stream.write(f"{_text}\n") self._cur_line_len = 0 def ok(self, text: str = "OK") -> None: @@ -374,15 +378,11 @@ def fail(self, text: str = "FAIL") -> None: _text = text if text else "FAIL" self._freeze(_text) + def is_jupyter(self) -> bool: + return not self._stream.isatty() + # Protected # - @staticmethod - def _warn_color_disabled() -> None: - warnings.warn( - "color, on_color and attrs are not supported when running in jupyter", - stacklevel=3, - ) - def _freeze(self, final_text: str) -> None: """Stop spinner, compose last frame and 'freeze' it.""" text = to_unicode(final_text) @@ -391,10 +391,10 @@ def _freeze(self, final_text: str) -> None: # Should be stopped here, otherwise prints after # self._freeze call will mess up the spinner self.stop() - with self._stdout_lock: + with self._stream_lock: if self._last_frame is None: raise RuntimeError("last_frame is None") - sys.stdout.write(self._last_frame) + self._stream.write(self._last_frame) self._cur_line_len = 0 def _spin(self) -> None: @@ -412,10 +412,10 @@ def _spin(self) -> None: out = self._compose_out(spin_phase) # Write - with self._stdout_lock: + with self._stream_lock: self._clear_line() - sys.stdout.write(out) - sys.stdout.flush() + self._stream.write(out) + self._stream.flush() self._cur_line_len = max(self._cur_line_len, len(out)) # Wait @@ -489,15 +489,28 @@ def _reset_signal_handlers(self) -> None: for sig, sig_handler in self._dfl_sigmap.items(): signal.signal(sig, sig_handler) - # Static - # - @staticmethod - def is_jupyter() -> bool: - return not sys.stdout.isatty() + def _hide_cursor(self) -> None: + if self._stream.isatty(): + # ANSI Control Sequence DECTCEM 1 does not work in Jupyter + self._stream.write("\033[?25l") + self._stream.flush() - @staticmethod - def _set_color(value: str) -> str: - if Yaspin.is_jupyter(): + def _show_cursor(self) -> None: + if self._stream.isatty(): + # ANSI Control Sequence DECTCEM 2 does not work in Jupyter + self._stream.write("\033[?25h") + self._stream.flush() + + def _clear_line(self) -> None: + if self._stream.isatty(): + # ANSI Control Sequence EL does not work in Jupyter + self._stream.write("\r\033[K") + else: + fill = " " * self._cur_line_len + self._stream.write(f"\r{fill}\r") + + def _set_color(self, value: str) -> str: + if self.is_jupyter(): Yaspin._warn_color_disabled() if value not in COLORS: @@ -508,9 +521,8 @@ def _set_color(value: str) -> str: ) return value - @staticmethod - def _set_on_color(value: str) -> str: - if Yaspin.is_jupyter(): + def _set_on_color(self, value: str) -> str: + if self.is_jupyter(): Yaspin._warn_color_disabled() if value not in HIGHLIGHTS: @@ -520,9 +532,8 @@ def _set_on_color(value: str) -> str: ) return value - @staticmethod - def _set_attrs(attrs: Sequence[str]) -> set[str]: - if Yaspin.is_jupyter(): + def _set_attrs(self, attrs: Sequence[str]) -> set[str]: + if self.is_jupyter(): Yaspin._warn_color_disabled() for attr in attrs: @@ -533,6 +544,15 @@ def _set_attrs(attrs: Sequence[str]) -> set[str]: ) return set(attrs) + # Static + # + @staticmethod + def _warn_color_disabled() -> None: + warnings.warn( + "color, on_color and attrs are not supported when running in jupyter", + stacklevel=3, + ) + @staticmethod def _set_spinner(spinner: Spinner) -> Spinner: if hasattr(spinner, "frames") and hasattr(spinner, "interval"): @@ -590,25 +610,3 @@ def _set_interval(spinner: Spinner) -> float: @staticmethod def _set_cycle(frames: Union[str, Sequence[str]]) -> Iterator[str]: return itertools.cycle(frames) - - @staticmethod - def _hide_cursor() -> None: - if sys.stdout.isatty(): - # ANSI Control Sequence DECTCEM 1 does not work in Jupyter - sys.stdout.write("\033[?25l") - sys.stdout.flush() - - @staticmethod - def _show_cursor() -> None: - if sys.stdout.isatty(): - # ANSI Control Sequence DECTCEM 2 does not work in Jupyter - sys.stdout.write("\033[?25h") - sys.stdout.flush() - - def _clear_line(self) -> None: - if sys.stdout.isatty(): - # ANSI Control Sequence EL does not work in Jupyter - sys.stdout.write("\r\033[K") - else: - fill = " " * self._cur_line_len - sys.stdout.write(f"\r{fill}\r")