Skip to content

Commit

Permalink
fix: remove trailing spaces where supported (#210)
Browse files Browse the repository at this point in the history
Fixes #136
  • Loading branch information
lengau authored Sep 12, 2024
1 parent 73e896e commit c62ce3a
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 53 deletions.
89 changes: 66 additions & 23 deletions craft_cli/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import itertools
import math
import os
import platform
import queue
import shutil
import threading
Expand All @@ -44,6 +46,8 @@
# craft_cli/pytest_plugin.py )
TESTMODE = False

ANSI_CLEAR_LINE_TO_END = "\033[K" # ANSI escape code to clear the rest of the line.


@dataclass
class _MessageInfo:
Expand Down Expand Up @@ -71,7 +75,25 @@ def _get_terminal_width() -> int:
return shutil.get_terminal_size().columns


def _format_term_line(prefix: str, text: str, spintext: str, *, ephemeral: bool) -> str:
@lru_cache
def _supports_ansi_escape_sequences() -> bool:
"""Whether the current environment supports ANSI escape sequences."""
if platform.system() != "Windows":
return True
return "WT_SESSION" in os.environ # Windows Terminal supports ANSI escape sequences.


def _fill_line(text: str) -> str:
"""Turn the input text into a line that will fill the terminal."""
if _supports_ansi_escape_sequences():
return text + ANSI_CLEAR_LINE_TO_END
width = _get_terminal_width()
# Fill the line but leave one character for the cursor.
n_spaces = width - len(text) % width - 1
return text + " " * n_spaces


def _format_term_line(previous_line_end: str, text: str, spintext: str, *, ephemeral: bool) -> str:
"""Format a line to print to the terminal."""
# fill with spaces until the very end, on one hand to clear a possible previous message,
# but also to always have the cursor at the very end
Expand All @@ -87,9 +109,8 @@ def _format_term_line(prefix: str, text: str, spintext: str, *, ephemeral: bool)
text = text[-remaining_for_last_line:]
if len(text) > usable:
text = text[: usable - 1] + "…"
cleaner = " " * (usable - len(text) % width)

return prefix + text + spintext + cleaner
return previous_line_end + _fill_line(text + spintext)


class _Spinner(threading.Thread):
Expand Down Expand Up @@ -226,38 +247,60 @@ def _get_prefixed_message_text(self, message: _MessageInfo) -> str:

return text

def _get_line_end(self, spintext: str) -> str:
"""Get the end of line to use when writing a line to the terminal."""
if spintext:
# forced to overwrite the previous message to present the spinner
return "\r"
if self.prv_msg is None or self.prv_msg.end_line:
# first message, or previous message completed the line: start clean
return ""
if self.prv_msg.ephemeral:
# the last one was ephemeral, overwrite it
return "\r"
# Previous line was ended; complete it.
return "\n"

def _write_line_terminal(self, message: _MessageInfo, *, spintext: str = "") -> None:
"""Write a simple line message to the screen."""
# prepare the text with (maybe) the timestamp
text = self._get_prefixed_message_text(message)
# prepare the text with (maybe) the timestamp and remove trailing spaces
text = self._get_prefixed_message_text(message).rstrip()

if message.use_timestamp:
timestamp_str = message.created_at.isoformat(sep=" ", timespec="milliseconds")
text = f"{timestamp_str} {text}"

if spintext:
# forced to overwrite the previous message to present the spinner
maybe_cr = "\r"
elif self.prv_msg is None or self.prv_msg.end_line:
# first message, or previous message completed the line: start clean
maybe_cr = ""
elif self.prv_msg.ephemeral:
# the last one was ephemeral, overwrite it
maybe_cr = "\r"
if self.prv_msg.stream != message.stream:
# If the last message's stream is different from this new one,
# send the carriage return to the original stream only.
print(maybe_cr, flush=True, file=self.prv_msg.stream, end="")
maybe_cr = ""
else:
# complete the previous line, leaving that message ok
maybe_cr = ""
previous_line_end = self._get_line_end(spintext)
if self.prv_msg and self.prv_msg.ephemeral and self.prv_msg.stream != message.stream:
# If the last message's stream is different from this new one,
# send a carriage return to the original stream only.
print("\r", flush=True, file=self.prv_msg.stream, end="")
previous_line_end = ""
if self.prv_msg and previous_line_end == "\n":
previous_line_end = ""
print(flush=True, file=self.prv_msg.stream)

# fill with spaces until the very end, on one hand to clear a possible previous message,
# but also to always have the cursor at the very end
width = _get_terminal_width()
usable = width - len(spintext) - 1 # the 1 is the cursor itself
if len(text) > usable:
if message.ephemeral:
text = text[: usable - 1] + "…"
elif spintext:
# we need to rewrite the message with the spintext, use only the last line for
# multiline messages, and ensure (again) that the last real line fits
remaining_for_last_line = len(text) % width
text = text[-remaining_for_last_line:]
if len(text) > usable:
text = text[: usable - 1] + "…"

# We don't need to rewrite the same ephemeral message repeatedly.
should_overwrite = spintext or message.end_line or not message.ephemeral
if should_overwrite or message != self.prv_msg:
line = _format_term_line(maybe_cr, text, spintext, ephemeral=message.ephemeral)
line = _format_term_line(
previous_line_end, text, spintext, ephemeral=message.ephemeral
)
print(line, end="", flush=True, file=message.stream)

if message.end_line:
Expand Down
21 changes: 12 additions & 9 deletions tests/integration/test_messages_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,24 @@ class Line:
regex: bool = False # if "text" is a regular expression instead of an exact string


def compare_lines(expected_lines: Collection[Line], raw_stream, std_stream):
def compare_lines(expected_lines: Collection[Line], raw_stream: str, std_stream):
"""Helper to compare expected lines to what was written to the terminal."""
width = printer._get_terminal_width()
terminal = printer._stream_is_terminal(std_stream)
if expected_lines:
assert len(raw_stream) > 0

if terminal:
# when showing to the terminal, it's completed always to screen width and terminated in
# different ways, so we split lines according to that length
assert (
len(raw_stream) % width == 0
), f"Bad length {len(raw_stream)} ({width=}) {raw_stream=!r}"
args = [iter(raw_stream)] * width
lines = ["".join(x) for x in zip(*args)] # pyright: ignore[reportGeneralTypeIssues]
if printer._supports_ansi_escape_sequences():
lines = raw_stream.replace("\033[K", "").splitlines(keepends=True)
else:
# If the terminal doesn't support ANSI escape sequences, we fill the screen
# width and don't terminate lines, so we split lines according to that length
assert (
len(raw_stream) % width == 0
), f"Bad length {len(raw_stream)} ({width=}) {raw_stream=!r}"
args = [iter(raw_stream)] * width
lines = ["".join(x) for x in zip(*args)] # pyright: ignore[reportGeneralTypeIssues]
else:
# when the output is captured, each line is simple and it should end in newline, so use
# that for splitting (but don't lose the newline)
Expand Down Expand Up @@ -1528,7 +1531,7 @@ def test_streaming_brief_spinner(capsys, logger, monkeypatch, init_emitter):
Line("Begin stage", permanent=False),
Line("Begin stage :: Opening stream", permanent=False),
Line("Begin stage :: Info message", permanent=False),
Line(r"Begin stage :: Info message - \(0.[7-9]s\)", permanent=False, regex=True),
Line(r"Begin stage :: Info message - \((0.[7-9]|1.0)s\)", permanent=False, regex=True),
Line("Begin stage :: Info message", permanent=False),
Line("Done stage", permanent=True),
]
Expand Down
Loading

0 comments on commit c62ce3a

Please sign in to comment.