From eaba2d710b9c2a31e05f08dae3fa68acf36171a8 Mon Sep 17 00:00:00 2001 From: Sorin Sbarnea Date: Tue, 11 Oct 2022 14:08:48 +0100 Subject: [PATCH] Enable black (#75) --- .pre-commit-config.yaml | 5 + demo/demo_colorer.py | 4 +- docs/conf.py | 47 ++++---- docs/make.py | 7 +- setup.cfg | 4 +- src/tendo/__init__.py | 11 +- src/tendo/ansiterm.py | 192 +++++++++++++++++------------- src/tendo/colorer.py | 47 +++++--- src/tendo/execfile2.py | 7 +- src/tendo/singleton.py | 30 ++--- src/tendo/tee.py | 117 +++++++++++------- src/tendo/tests/test_colorer.py | 11 +- src/tendo/tests/test_execfile2.py | 5 +- src/tendo/tests/test_singleton.py | 2 +- src/tendo/tests/test_tee.py | 29 +++-- src/tendo/tests/test_unicode.py | 6 +- src/tendo/unicode.py | 19 +-- 17 files changed, 320 insertions(+), 223 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a015d3b..9df992a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -19,6 +19,11 @@ repos: - id: requirements-txt-fixer - id: check-yaml files: .*\.(yaml|yml)$ + - repo: https://github.com/psf/black + rev: 22.10.0 + hooks: + - id: black + language_version: python3 - repo: https://github.com/adrienverge/yamllint.git rev: v1.28.0 hooks: diff --git a/demo/demo_colorer.py b/demo/demo_colorer.py index 75bab59..690824c 100755 --- a/demo/demo_colorer.py +++ b/demo/demo_colorer.py @@ -1,9 +1,9 @@ #!/usr/bin/env python # encoding: utf-8 -from tendo import colorer # noqa +from tendo import colorer # noqa -if __name__ == '__main__': +if __name__ == "__main__": import logging logging.getLogger().setLevel(logging.NOTSET) diff --git a/docs/conf.py b/docs/conf.py index 8d9230c..18cd77b 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -14,7 +14,12 @@ import inspect import os import sys -cmd_folder = os.path.realpath(os.path.join(os.path.abspath(os.path.split(inspect.getfile(inspect.currentframe()))[0]), "..")) + +cmd_folder = os.path.realpath( + os.path.join( + os.path.abspath(os.path.split(inspect.getfile(inspect.currentframe()))[0]), ".." + ) +) if cmd_folder not in sys.path: sys.path.insert(0, cmd_folder) from tendo import __version__ # noqa:E402 @@ -22,7 +27,7 @@ # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. -sys.path.insert(0, os.path.abspath('..')) +sys.path.insert(0, os.path.abspath("..")) # -- General configuration ----------------------------------------------------- @@ -31,23 +36,29 @@ # Add any Sphinx extension module names here, as strings. They can be extensions # coming with Sphinx (named 'sphinx.ext.*') or your custom ones. -extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx', 'sphinx.ext.todo', 'sphinx.ext.coverage', 'sphinx.ext.viewcode'] +extensions = [ + "sphinx.ext.autodoc", + "sphinx.ext.intersphinx", + "sphinx.ext.todo", + "sphinx.ext.coverage", + "sphinx.ext.viewcode", +] # Add any paths that contain templates here, relative to this directory. -templates_path = ['templates'] +templates_path = ["templates"] # The suffix of source filenames. -source_suffix = '.rst' +source_suffix = ".rst" # The encoding of source files. # source_encoding = 'utf-8-sig' # The master toctree document. -master_doc = 'index' +master_doc = "index" # General information about the project. -project = 'tendo' -copyright = '2010-2013, Sorin Sbarnea' +project = "tendo" +copyright = "2010-2013, Sorin Sbarnea" # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the @@ -72,7 +83,7 @@ # List of patterns, relative to source directory, that match files and # directories to igno`re when looking for source files. -exclude_patterns = ['build'] +exclude_patterns = ["build"] # The reST default role (used for this markup: `text`) to use for all documents. # default_role = None @@ -89,7 +100,7 @@ # show_authors = False # The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' +pygments_style = "sphinx" # A list of ignored prefixes for module index sorting. # modindex_common_prefix = [] @@ -99,7 +110,7 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. -html_theme = 'default' +html_theme = "default" # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the @@ -128,7 +139,7 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['static'] +html_static_path = ["static"] # If not '', a 'Last updated on:' timestamp is inserted at every page bottom, # using the given strftime format. @@ -172,7 +183,7 @@ # html_file_suffix = None # Output file base name for HTML help builder. -htmlhelp_basename = 'tendodoc' +htmlhelp_basename = "tendodoc" # -- Options for LaTeX output -------------------------------------------------- @@ -186,8 +197,7 @@ # Grouping the document tree into LaTeX files. List of tuples # (source start file, target name, title, author, documentclass [howto/manual]). latex_documents = [ - ('index', 'tendo.tex', 'tendo Documentation', - 'Sorin Sbarnea', 'manual'), + ("index", "tendo.tex", "tendo Documentation", "Sorin Sbarnea", "manual"), ] # The name of an image file (relative to this directory) to place at the top of @@ -218,11 +228,8 @@ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). -man_pages = [ - ('index', 'tendo', 'tendo Documentation', - ['Sorin Sbârnea'], 1) -] +man_pages = [("index", "tendo", "tendo Documentation", ["Sorin Sbârnea"], 1)] # Example configuration for intersphinx: refer to the Python standard library. -intersphinx_mapping = {'http://docs.python.org/': None} +intersphinx_mapping = {"http://docs.python.org/": None} diff --git a/docs/make.py b/docs/make.py index 05fcb2b..98a3e60 100755 --- a/docs/make.py +++ b/docs/make.py @@ -1,7 +1,8 @@ #!/usr/bin/env python import os -if 'PYTHONPATH' in os.environ: - os.environ['PYTHONPATH'] = "..:" + os.environ['PYTHONPATH'] + +if "PYTHONPATH" in os.environ: + os.environ["PYTHONPATH"] = "..:" + os.environ["PYTHONPATH"] else: - os.environ['PYTHONPATH'] = ".." + os.environ["PYTHONPATH"] = ".." os.system("make html") diff --git a/setup.cfg b/setup.cfg index 3bed842..8d2db4f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -83,5 +83,7 @@ upload-dir = docs/build/html [flake8] enable-extensions = H106,H203,H204,H205,H210,H904 exclude = __pycache__,build,src,.tox -ignore = D +ignore = + D + W503 # black conflict max-line-length=1024 diff --git a/src/tendo/__init__.py b/src/tendo/__init__.py index 28423d4..bc5791e 100644 --- a/src/tendo/__init__.py +++ b/src/tendo/__init__.py @@ -10,8 +10,15 @@ __copyright__ = "Copyright 2010-2022, Sorin Sbarnea" __email__ = "sorin.sbarnea@gmail.com" __status__ = "Production" -__all__ = ('tee', 'colorer', 'unicode', - 'execfile2', 'singleton', 'ansiterm', '__version__') +__all__ = ( + "tee", + "colorer", + "unicode", + "execfile2", + "singleton", + "ansiterm", + "__version__", +) if sys.hexversion < 0x03080000: diff --git a/src/tendo/ansiterm.py b/src/tendo/ansiterm.py index 29e3008..d94d234 100755 --- a/src/tendo/ansiterm.py +++ b/src/tendo/ansiterm.py @@ -6,7 +6,7 @@ try: if (not sys.stderr.isatty()) or (not sys.stdout.isatty()): - raise ValueError('not a tty') + raise ValueError("not a tty") import ctypes from ctypes import byref @@ -19,22 +19,31 @@ class COORD(ctypes.Structure): _fields_ = [("X", c_short), ("Y", c_short)] class SMALL_RECT(ctypes.Structure): - _fields_ = [("Left", c_short), ("Top", c_short), - ("Right", c_short), ("Bottom", c_short)] + _fields_ = [ + ("Left", c_short), + ("Top", c_short), + ("Right", c_short), + ("Bottom", c_short), + ] class CONSOLE_SCREEN_BUFFER_INFO(ctypes.Structure): - _fields_ = [("Size", COORD), ("CursorPosition", COORD), ("Attributes", - c_short), ("Window", SMALL_RECT), ("MaximumWindowSize", COORD)] + _fields_ = [ + ("Size", COORD), + ("CursorPosition", COORD), + ("Attributes", c_short), + ("Window", SMALL_RECT), + ("MaximumWindowSize", COORD), + ] class CONSOLE_CURSOR_INFO(ctypes.Structure): - _fields_ = [('dwSize', ctypes.c_ulong), ('bVisible', c_int)] + _fields_ = [("dwSize", ctypes.c_ulong), ("bVisible", c_int)] sbinfo = CONSOLE_SCREEN_BUFFER_INFO() csinfo = CONSOLE_CURSOR_INFO() hconsole = windll.kernel32.GetStdHandle(-11) windll.kernel32.GetConsoleScreenBufferInfo(hconsole, byref(sbinfo)) if sbinfo.Size.X < 10 or sbinfo.Size.Y < 10: - raise Exception('small console') + raise Exception("small console") windll.kernel32.GetConsoleCursorInfo(hconsole, byref(csinfo)) except Exception: pass @@ -44,13 +53,13 @@ class CONSOLE_CURSOR_INFO(ctypes.Structure): def to_int(number, default): return number and int(number) or default + wlock = threading.Lock() STD_OUTPUT_HANDLE = -11 STD_ERROR_HANDLE = -12 class AnsiTerm: - def __init__(self): self.encoding = sys.stdout.encoding self.hconsole = windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE) @@ -58,14 +67,13 @@ def __init__(self): self.orig_sbinfo = CONSOLE_SCREEN_BUFFER_INFO() self.orig_csinfo = CONSOLE_CURSOR_INFO() windll.kernel32.GetConsoleScreenBufferInfo( - self.hconsole, byref(self.orig_sbinfo)) - windll.kernel32.GetConsoleCursorInfo( - hconsole, byref(self.orig_csinfo)) + self.hconsole, byref(self.orig_sbinfo) + ) + windll.kernel32.GetConsoleCursorInfo(hconsole, byref(self.orig_csinfo)) def screen_buffer_info(self): sbinfo = CONSOLE_SCREEN_BUFFER_INFO() - windll.kernel32.GetConsoleScreenBufferInfo( - self.hconsole, byref(sbinfo)) + windll.kernel32.GetConsoleScreenBufferInfo(self.hconsole, byref(sbinfo)) return sbinfo def clear_line(self, param): @@ -75,39 +83,59 @@ def clear_line(self, param): line_start = COORD(0, sbinfo.CursorPosition.Y) line_length = sbinfo.Size.X elif mode == 2: # Clear entire line - line_start = COORD( - sbinfo.CursorPosition.X, sbinfo.CursorPosition.Y) + line_start = COORD(sbinfo.CursorPosition.X, sbinfo.CursorPosition.Y) line_length = sbinfo.Size.X - sbinfo.CursorPosition.X else: # Clear from cursor position to end of line line_start = sbinfo.CursorPosition line_length = sbinfo.Size.X - sbinfo.CursorPosition.X chars_written = c_int() windll.kernel32.FillConsoleOutputCharacterA( - self.hconsole, c_char(' '), line_length, line_start, byref(chars_written)) + self.hconsole, + c_char(" "), + line_length, + line_start, + byref(chars_written), + ) windll.kernel32.FillConsoleOutputAttribute( - self.hconsole, sbinfo.Attributes, line_length, line_start, byref(chars_written)) + self.hconsole, + sbinfo.Attributes, + line_length, + line_start, + byref(chars_written), + ) def clear_screen(self, param): mode = to_int(param, 0) sbinfo = self.screen_buffer_info() if mode == 1: # Clear from begining of screen to cursor position clear_start = COORD(0, 0) - clear_length = sbinfo.CursorPosition.X * \ - sbinfo.CursorPosition.Y + clear_length = sbinfo.CursorPosition.X * sbinfo.CursorPosition.Y elif mode == 2: # Clear entire screen and return cursor to home clear_start = COORD(0, 0) clear_length = sbinfo.Size.X * sbinfo.Size.Y - windll.kernel32.SetConsoleCursorPosition( - self.hconsole, clear_start) + windll.kernel32.SetConsoleCursorPosition(self.hconsole, clear_start) else: # Clear from cursor position to end of screen clear_start = sbinfo.CursorPosition - clear_length = sbinfo.Size.X - sbinfo.CursorPosition.X + \ - sbinfo.Size.X * (sbinfo.Size.Y - sbinfo.CursorPosition.Y) + clear_length = ( + sbinfo.Size.X + - sbinfo.CursorPosition.X + + sbinfo.Size.X * (sbinfo.Size.Y - sbinfo.CursorPosition.Y) + ) chars_written = c_int() windll.kernel32.FillConsoleOutputCharacterA( - self.hconsole, c_char(' '), clear_length, clear_start, byref(chars_written)) + self.hconsole, + c_char(" "), + clear_length, + clear_start, + byref(chars_written), + ) windll.kernel32.FillConsoleOutputAttribute( - self.hconsole, sbinfo.Attributes, clear_length, clear_start, byref(chars_written)) + self.hconsole, + sbinfo.Attributes, + clear_length, + clear_start, + byref(chars_written), + ) def push_cursor(self, param): sbinfo = self.screen_buffer_info() @@ -116,34 +144,29 @@ def push_cursor(self, param): def pop_cursor(self, param): if self.cursor_history: old_pos = self.cursor_history.pop() - windll.kernel32.SetConsoleCursorPosition( - self.hconsole, old_pos) + windll.kernel32.SetConsoleCursorPosition(self.hconsole, old_pos) def set_cursor(self, param): - x, sep, y = param.partition(';') + x, sep, y = param.partition(";") x = to_int(x, 1) - 1 y = to_int(y, 1) - 1 sbinfo = self.screen_buffer_info() new_pos = COORD( - min(max(0, x), sbinfo.Size.X), - min(max(0, y), sbinfo.Size.Y) + min(max(0, x), sbinfo.Size.X), min(max(0, y), sbinfo.Size.Y) ) windll.kernel32.SetConsoleCursorPosition(self.hconsole, new_pos) def set_column(self, param): x = to_int(param, 1) - 1 sbinfo = self.screen_buffer_info() - new_pos = COORD( - min(max(0, x), sbinfo.Size.X), - sbinfo.CursorPosition.Y - ) + new_pos = COORD(min(max(0, x), sbinfo.Size.X), sbinfo.CursorPosition.Y) windll.kernel32.SetConsoleCursorPosition(self.hconsole, new_pos) def move_cursor(self, x_offset=0, y_offset=0): sbinfo = self.screen_buffer_info() new_pos = COORD( min(max(0, sbinfo.CursorPosition.X + x_offset), sbinfo.Size.X), - min(max(0, sbinfo.CursorPosition.Y + y_offset), sbinfo.Size.Y) + min(max(0, sbinfo.CursorPosition.Y + y_offset), sbinfo.Size.Y), ) windll.kernel32.SetConsoleCursorPosition(self.hconsole, new_pos) @@ -162,52 +185,49 @@ def move_right(self, param): def next_line(self, param): sbinfo = self.screen_buffer_info() self.move_cursor( - x_offset=-sbinfo.CursorPosition.X, - y_offset=to_int(param, 1) + x_offset=-sbinfo.CursorPosition.X, y_offset=to_int(param, 1) ) def prev_line(self, param): sbinfo = self.screen_buffer_info() self.move_cursor( - x_offset=-sbinfo.CursorPosition.X, - y_offset=-to_int(param, 1) + x_offset=-sbinfo.CursorPosition.X, y_offset=-to_int(param, 1) ) - escape_to_color = {(0, 30): 0x0, # black - (0, 31): 0x4, # red - (0, 32): 0x2, # green - (0, 33): 0x4 + 0x2, # dark yellow - (0, 34): 0x1, # blue - (0, 35): 0x1 + 0x4, # purple - (0, 36): 0x2 + 0x4, # cyan - (0, 37): 0x1 + 0x2 + 0x4, # grey - (1, 30): 0x1 + 0x2 + 0x4, # dark gray - (1, 31): 0x4 + 0x8, # red - (1, 32): 0x2 + 0x8, # light green - (1, 33): 0x4 + 0x2 + 0x8, # yellow - (1, 34): 0x1 + 0x8, # light blue - (1, 35): 0x1 + 0x4 + 0x8, # light purple - (1, 36): 0x1 + 0x2 + 0x8, # light cyan - (1, 37): 0x1 + 0x2 + 0x4 + 0x8, # white - } + escape_to_color = { + (0, 30): 0x0, # black + (0, 31): 0x4, # red + (0, 32): 0x2, # green + (0, 33): 0x4 + 0x2, # dark yellow + (0, 34): 0x1, # blue + (0, 35): 0x1 + 0x4, # purple + (0, 36): 0x2 + 0x4, # cyan + (0, 37): 0x1 + 0x2 + 0x4, # grey + (1, 30): 0x1 + 0x2 + 0x4, # dark gray + (1, 31): 0x4 + 0x8, # red + (1, 32): 0x2 + 0x8, # light green + (1, 33): 0x4 + 0x2 + 0x8, # yellow + (1, 34): 0x1 + 0x8, # light blue + (1, 35): 0x1 + 0x4 + 0x8, # light purple + (1, 36): 0x1 + 0x2 + 0x8, # light cyan + (1, 37): 0x1 + 0x2 + 0x4 + 0x8, # white + } def set_color(self, param): - cols = param.split(';') + cols = param.split(";") attr = self.orig_sbinfo.Attributes for c in cols: c = to_int(c, 0) if c in range(30, 38): - attr = (attr & 0xf0) | ( - self.escape_to_color.get((0, c), 0x7)) + attr = (attr & 0xF0) | (self.escape_to_color.get((0, c), 0x7)) elif c in range(40, 48): - attr = (attr & 0x0f) | ( - self.escape_to_color.get((0, c), 0x7) << 8) + attr = (attr & 0x0F) | (self.escape_to_color.get((0, c), 0x7) << 8) elif c in range(90, 98): - attr = (attr & 0xf0) | ( - self.escape_to_color.get((1, c - 60), 0x7)) + attr = (attr & 0xF0) | (self.escape_to_color.get((1, c - 60), 0x7)) elif c in range(100, 108): - attr = (attr & 0x0f) | ( - self.escape_to_color.get((1, c - 60), 0x7) << 8) + attr = (attr & 0x0F) | ( + self.escape_to_color.get((1, c - 60), 0x7) << 8 + ) elif c == 1: attr |= 0x08 windll.kernel32.SetConsoleTextAttribute(self.hconsole, attr) @@ -221,26 +241,26 @@ def hide_cursor(self, param): windll.kernel32.SetConsoleCursorInfo(self.hconsole, byref(csinfo)) ansi_command_table = { - 'A': move_up, - 'B': move_down, - 'C': move_right, - 'D': move_left, - 'E': next_line, - 'F': prev_line, - 'G': set_column, - 'H': set_cursor, - 'f': set_cursor, - 'J': clear_screen, - 'K': clear_line, - 'h': show_cursor, - 'l': hide_cursor, - 'm': set_color, - 's': push_cursor, - 'u': pop_cursor, + "A": move_up, + "B": move_down, + "C": move_right, + "D": move_left, + "E": next_line, + "F": prev_line, + "G": set_column, + "H": set_cursor, + "f": set_cursor, + "J": clear_screen, + "K": clear_line, + "h": show_cursor, + "l": hide_cursor, + "m": set_color, + "s": push_cursor, + "u": pop_cursor, } # Match either the escape sequence or text not containing escape # sequence - ansi_tokans = re.compile(r'(?:\x1b\[([0-9?;]*)([a-zA-Z])|([^\x1b]+))') + ansi_tokans = re.compile(r"(?:\x1b\[([0-9?;]*)([a-zA-Z])|([^\x1b]+))") def write(self, text): try: @@ -254,10 +274,12 @@ def write(self, text): chars_written = c_int() if isinstance(txt, str): windll.kernel32.WriteConsoleW( - self.hconsole, txt, len(txt), byref(chars_written), None) + self.hconsole, txt, len(txt), byref(chars_written), None + ) else: windll.kernel32.WriteConsoleA( - self.hconsole, txt, len(txt), byref(chars_written), None) + self.hconsole, txt, len(txt), byref(chars_written), None + ) finally: wlock.release() @@ -268,4 +290,4 @@ def isatty(self): return True sys.stderr = sys.stdout = AnsiTerm() - os.environ['TERM'] = 'vt100' + os.environ["TERM"] = "vt100" diff --git a/src/tendo/colorer.py b/src/tendo/colorer.py index 94c4e39..6056d44 100755 --- a/src/tendo/colorer.py +++ b/src/tendo/colorer.py @@ -20,9 +20,11 @@ import sys -if (hasattr(sys.stderr, "isatty") and sys.stderr.isatty()) or \ - ('TERM' in os.environ.keys() and os.environ['TERM'] in ['linux']) or \ - ('PYCHARM_HOSTED' in os.environ.keys()): +if ( + (hasattr(sys.stderr, "isatty") and sys.stderr.isatty()) + or ("TERM" in os.environ.keys() and os.environ["TERM"] in ["linux"]) + or ("PYCHARM_HOSTED" in os.environ.keys()) +): # Why stderr and not stdout? - because python logging module does output to stderr by default and not stdout. # now we patch Python code to add color support to logging.StreamHandler @@ -30,16 +32,18 @@ def add_coloring_to_emit_windows(fn): # add methods we need to the class def _out_handle(self): import ctypes + return ctypes.windll.kernel32.GetStdHandle(self.STD_OUTPUT_HANDLE) def _set_color(self, code): import ctypes + # Constants from the Windows API self.STD_OUTPUT_HANDLE = -11 hdl = ctypes.windll.kernel32.GetStdHandle(self.STD_OUTPUT_HANDLE) ctypes.windll.kernel32.SetConsoleTextAttribute(hdl, code) - setattr(logging.StreamHandler, '_set_color', _set_color) + setattr(logging.StreamHandler, "_set_color", _set_color) def new(*args): FOREGROUND_BLUE = 0x0001 # text color contains blue. @@ -75,7 +79,12 @@ def new(*args): levelno = args[1].levelno if levelno >= 50: - color = BACKGROUND_YELLOW | FOREGROUND_RED | FOREGROUND_INTENSITY | BACKGROUND_INTENSITY + color = ( + BACKGROUND_YELLOW + | FOREGROUND_RED + | FOREGROUND_INTENSITY + | BACKGROUND_INTENSITY + ) elif levelno >= 40: color = FOREGROUND_RED | FOREGROUND_INTENSITY elif levelno >= 30: @@ -92,6 +101,7 @@ def new(*args): args[0]._set_color(FOREGROUND_WHITE) # print "after" return ret + return new def add_coloring_to_emit_ansi(fn): @@ -102,39 +112,42 @@ def new(*args): new_args = (args[0], copy.copy(args[1])) else: new_args = (args[0], copy.copy(args[1]), args[2:]) - if hasattr(args[0], 'baseFilename'): + if hasattr(args[0], "baseFilename"): return fn(*args) levelno = new_args[1].levelno if levelno >= 50: - color = '\x1b[31m' # red + color = "\x1b[31m" # red elif levelno >= 40: - color = '\x1b[31m' # red + color = "\x1b[31m" # red elif levelno >= 30: - color = '\x1b[33m' # yellow + color = "\x1b[33m" # yellow elif levelno >= 20: - color = '\x1b[32m' # green + color = "\x1b[32m" # green elif levelno >= 10: - color = '\x1b[35m' # pink + color = "\x1b[35m" # pink else: - color = '\x1b[0m' # normal + color = "\x1b[0m" # normal try: - new_args[ - 1].msg = color + str(new_args[1].msg) + '\x1b[0m' # normal + new_args[1].msg = color + str(new_args[1].msg) + "\x1b[0m" # normal except Exception as e: raise e return fn(*new_args) + return new import platform - if platform.system() == 'Windows': + + if platform.system() == "Windows": # Windows does not support ANSI escapes and we are using API calls to # set the console color logging.StreamHandler.emit = add_coloring_to_emit_windows( - logging.StreamHandler.emit) + logging.StreamHandler.emit + ) else: # all non-Windows platforms are supporting ANSI escapes so we use them logging.StreamHandler.emit = add_coloring_to_emit_ansi( - logging.StreamHandler.emit) + logging.StreamHandler.emit + ) # log = logging.getLogger() # log.addFilter(log_filter()) # //hdlr = logging.StreamHandler() diff --git a/src/tendo/execfile2.py b/src/tendo/execfile2.py index bd413fa..e99ffc1 100755 --- a/src/tendo/execfile2.py +++ b/src/tendo/execfile2.py @@ -3,6 +3,7 @@ import sys if sys.hexversion > 0x03000000: + def execfile(file, globals=globals(), locals=locals()): fh = open(file, "r") if not fh: @@ -26,7 +27,7 @@ def execfile2(filename, _globals=dict(), _locals=dict(), cmd=None, quiet=False): - 1 - if SystemExit does not contain an error code or if other Exception is received. - x - the SystemExit error code (if present) """ - _globals['__name__'] = '__main__' + _globals["__name__"] = "__main__" saved_argv = sys.argv # we save sys.argv if cmd: sys.argv = list([filename]) @@ -36,8 +37,7 @@ def execfile2(filename, _globals=dict(), _locals=dict(), cmd=None, quiet=False): sys.argv.extend(shlex.split(cmd)) exit_code = 0 try: - exec( - compile(open(filename).read(), filename, 'exec'), _globals, _locals) + exec(compile(open(filename).read(), filename, "exec"), _globals, _locals) except SystemExit: type, e, tb = sys.exc_info() @@ -48,6 +48,7 @@ def execfile2(filename, _globals=dict(), _locals=dict(), cmd=None, quiet=False): except Exception: if not quiet: import traceback + traceback.print_exc(file=sys.stderr) exit_code = 1 finally: diff --git a/src/tendo/singleton.py b/src/tendo/singleton.py index 68376b4..7151241 100755 --- a/src/tendo/singleton.py +++ b/src/tendo/singleton.py @@ -35,38 +35,40 @@ def __init__(self, flavor_id="", lockfile=""): if lockfile: self.lockfile = lockfile else: - basename = os.path.splitext(os.path.abspath(sys.argv[0]))[0].replace( - "/", "-").replace(":", "").replace("\\", "-") + '-%s' % flavor_id + '.lock' - self.lockfile = os.path.normpath( - tempfile.gettempdir() + '/' + basename) + basename = ( + os.path.splitext(os.path.abspath(sys.argv[0]))[0] + .replace("/", "-") + .replace(":", "") + .replace("\\", "-") + + "-%s" % flavor_id + + ".lock" + ) + self.lockfile = os.path.normpath(tempfile.gettempdir() + "/" + basename) logger.debug(f"SingleInstance lockfile: {self.lockfile}") def __enter__(self): - if sys.platform == 'win32': + if sys.platform == "win32": try: # file already exists, we try to remove (in case previous # execution was interrupted) if os.path.exists(self.lockfile): os.unlink(self.lockfile) - self.fd = os.open( - self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR) + self.fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR) except OSError: type, e, tb = sys.exc_info() if e.errno == 13: - logger.error( - "Another instance is already running, quitting.") + logger.error("Another instance is already running, quitting.") raise SingleInstanceException() print(e.errno) raise else: # non Windows - self.fp = open(self.lockfile, 'w') + self.fp = open(self.lockfile, "w") self.fp.flush() try: fcntl.lockf(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: - logger.warning( - "Another instance is already running, quitting.") + logger.warning("Another instance is already running, quitting.") raise SingleInstanceException() self.initialized = True return self @@ -77,8 +79,8 @@ def __exit__(self, exc_type, exc_value, exc_tb): if exc_value is not None: logger.warning("Error: %s" % exc_value, exc_info=True) try: - if sys.platform == 'win32': - if hasattr(self, 'fd'): + if sys.platform == "win32": + if hasattr(self, "fd"): os.close(self.fd) os.unlink(self.lockfile) else: diff --git a/src/tendo/tee.py b/src/tendo/tee.py index a1b1f89..d847cc9 100755 --- a/src/tendo/tee.py +++ b/src/tendo/tee.py @@ -40,7 +40,14 @@ def quote_command(cmd): return cmd -def system2(cmd, cwd=None, logger=_sentinel, stdout=_sentinel, log_command=_sentinel, timing=_sentinel): +def system2( + cmd, + cwd=None, + logger=_sentinel, + stdout=_sentinel, + log_command=_sentinel, + timing=_sentinel, +): # def tee(cmd, cwd=None, logger=tee_logger, console=tee_console): """Works exactly like :func:`system` but it returns both the exit code and the output as a list of lines. @@ -49,35 +56,35 @@ def system2(cmd, cwd=None, logger=_sentinel, stdout=_sentinel, log_command=_sent # if isinstance(cmd, collections.Iterable): # -- this line was replaced # because collections.Iterable seems to be missing on Debian Python 2.5.5 # (but not on OS X 10.8 with Python 2.5.6) - if hasattr(cmd, '__iter__'): + if hasattr(cmd, "__iter__"): cmd = " ".join(pipes.quote(s) for s in cmd) t = time.process_time() output = [] if log_command is _sentinel: - log_command = globals().get('log_command') + log_command = globals().get("log_command") if timing is _sentinel: - timing = globals().get('timing') + timing = globals().get("timing") # default to python native logger if logger parameter is not used if logger is _sentinel: - logger = globals().get('logger') + logger = globals().get("logger") if stdout is _sentinel: - stdout = globals().get('stdout') + stdout = globals().get("stdout") # logging.debug("logger=%s stdout=%s" % (logger, stdout)) f = sys.stdout - if not f.encoding or f.encoding == 'ascii': + if not f.encoding or f.encoding == "ascii": # `ascii` is not a valid encoding by our standards, it's better to output to UTF-8 because it can encoding any Unicode text - encoding = 'utf_8' + encoding = "utf_8" else: encoding = f.encoding def filelogger(msg): try: # we'll use the same endline on all platforms, you like it or not - msg += '\n' + msg += "\n" try: f.write(msg) except TypeError: @@ -85,8 +92,8 @@ def filelogger(msg): except Exception: sys.exc_info()[1] import traceback - print(' ****** ERROR: Exception: %s\nencoding = %s' % - (e, encoding)) + + print(" ****** ERROR: Exception: %s\nencoding = %s" % (e, encoding)) traceback.print_exc(file=sys.stderr) sys.exit(-1) pass @@ -97,27 +104,29 @@ def nop(msg): if not logger: mylogger = nop elif isinstance(logger, str): - f = codecs.open(logger, "a+b", 'utf_8') + f = codecs.open(logger, "a+b", "utf_8") mylogger = filelogger - elif isinstance(logger, (types.FunctionType, types.MethodType, types.BuiltinFunctionType)): + elif isinstance( + logger, (types.FunctionType, types.MethodType, types.BuiltinFunctionType) + ): mylogger = logger else: method_write = getattr(logger, "write", None) # if we can call write() we'll aceppt it :D # this should work for filehandles - if hasattr(method_write, '__call__'): + if hasattr(method_write, "__call__"): f = logger mylogger = filelogger else: - sys.exit("tee() does not support this type of logger=%s" % - type(logger)) + sys.exit("tee() does not support this type of logger=%s" % type(logger)) if cwd is not None and not os.path.isdir(cwd): os.makedirs(cwd) # this throws exception if fails cmd = quote_command(cmd) # to prevent _popen() bug p = subprocess.Popen( - cmd, cwd=cwd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + cmd, cwd=cwd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) if log_command: mylogger("Running: %s" % cmd) while True: @@ -128,36 +137,45 @@ def nop(msg): except Exception: e = sys.exc_info()[1] logging.error(e) - logging.error("The output of the command could not be decoded as %s\ncmd: %s\n line ignored: %s" % - (encoding, cmd, repr(line))) + logging.error( + "The output of the command could not be decoded as %s\ncmd: %s\n line ignored: %s" + % (encoding, cmd, repr(line)) + ) pass output.append(line) if not line: break - line = line.rstrip('\n\r') + line = line.rstrip("\n\r") mylogger(line) # they are added by logging anyway if stdout: print(line) returncode = p.wait() if log_command: if timing: + def secondsToStr(t): - return time.strftime('%H:%M:%S', time.gmtime(t)) - mylogger("Returned: %d (execution time %s)\n" % - (returncode, secondsToStr(time.process_time() - t))) + return time.strftime("%H:%M:%S", time.gmtime(t)) + + mylogger( + "Returned: %d (execution time %s)\n" + % (returncode, secondsToStr(time.process_time() - t)) + ) else: mylogger("Returned: %d\n" % returncode) # running a tool that returns non-zero? this deserves a warning if not returncode == 0: - logging.warning("Returned: %d from: %s\nOutput %s" % - (returncode, cmd, '\n'.join(output))) + logging.warning( + "Returned: %d from: %s\nOutput %s" % (returncode, cmd, "\n".join(output)) + ) return returncode, output -def system(cmd, cwd=None, logger=None, stdout=None, log_command=_sentinel, timing=_sentinel): +def system( + cmd, cwd=None, logger=None, stdout=None, log_command=_sentinel, timing=_sentinel +): """This works similar to :py:func:`os.system` but add some useful optional parameters. * ``cmd`` - command to be executed @@ -174,60 +192,69 @@ def system(cmd, cwd=None, logger=None, stdout=None, log_command=_sentinel, timin ... tee.system("echo test", logger=f) # output to a filehandle ... tee.system("echo test", logger=print) # use the print() function for output """ - (returncode, output) = system2(cmd, cwd=cwd, logger=logger, - stdout=stdout, log_command=log_command, timing=timing) + (returncode, output) = system2( + cmd, + cwd=cwd, + logger=logger, + stdout=stdout, + log_command=log_command, + timing=timing, + ) return returncode class testTee(unittest.TestCase): - def test_1(self): """No CMD os.system() - 1 sort /? ok ok - 2 "sort" /? ok ok - 3 sort "/?" ok ok - 4 "sort" "/?" ok [bad] - 5 ""sort /?"" ok [bad] - 6 "sort /?" [bad] ok - 7 "sort "/?"" [bad] ok - 8 ""sort" "/?"" [bad] ok + 1 sort /? ok ok + 2 "sort" /? ok ok + 3 sort "/?" ok ok + 4 "sort" "/?" ok [bad] + 5 ""sort /?"" ok [bad] + 6 "sort /?" [bad] ok + 7 "sort "/?"" [bad] ok + 8 ""sort" "/?"" [bad] ok """ quotes = { - 'dir >nul': 'dir >nul', + "dir >nul": "dir >nul", 'cd /D "C:\\Program Files\\"': '"cd /D "C:\\Program Files\\""', 'python -c "import os" dummy': '"python -c "import os" dummy"', - 'sort': 'sort', + "sort": "sort", } # we fake the os name because we want to run the test on any platform save = os.name - os.name = 'nt' + os.name = "nt" for key, value in quotes.items(): resulted_value = quote_command(key) self.assertEqual( - value, resulted_value, "Returned <%s>, expected <%s>" % (resulted_value, value)) + value, + resulted_value, + "Returned <%s>, expected <%s>" % (resulted_value, value), + ) # ret = os.system(resulted_value) # if not ret==0: # print("failed") os.name = save def test_2(self): - self.assertEqual(system(['python', '-V']), 0) + self.assertEqual(system(["python", "-V"]), 0) def test_3(self): - self.assertEqual(system2(['python', '-V'])[0], 0) + self.assertEqual(system2(["python", "-V"])[0], 0) def test_4(self): - self.assertEqual(system(['python', '-c', "print('c c')"]), 0) + self.assertEqual(system(["python", "-c", "print('c c')"]), 0) -if __name__ == '__main__': +if __name__ == "__main__": # unittest.main() import pytest + pytest.main([__file__]) # import pytest diff --git a/src/tendo/tests/test_colorer.py b/src/tendo/tests/test_colorer.py index 554b85d..a2e2e73 100644 --- a/src/tendo/tests/test_colorer.py +++ b/src/tendo/tests/test_colorer.py @@ -15,12 +15,12 @@ def test_colorer(): print("sys.stdout.isatty = %s" % isatty) logging.getLogger().setLevel(logging.NOTSET) - tmp_file = tempfile.NamedTemporaryFile(suffix='_colorer.log').name + tmp_file = tempfile.NamedTemporaryFile(suffix="_colorer.log").name fh = logging.FileHandler(tmp_file) fh.setLevel(logging.NOTSET) ch = logging.StreamHandler() ch.setLevel(logging.NOTSET) - formatter = logging.Formatter('%(levelname)s: %(message)s') + formatter = logging.Formatter("%(levelname)s: %(message)s") fh.setFormatter(formatter) ch.setFormatter(formatter) logging.getLogger().addHandler(ch) @@ -30,7 +30,12 @@ def test_colorer(): logging.error("some error") logging.info("some info") logging.debug("some info") - expected_lines = ['WARNING: a warning\n', 'ERROR: some error\n', 'INFO: some info\n', 'DEBUG: some info\n'] + expected_lines = [ + "WARNING: a warning\n", + "ERROR: some error\n", + "INFO: some info\n", + "DEBUG: some info\n", + ] line_no = 0 for line in open(tmp_file).readlines(): assert line == expected_lines[line_no] diff --git a/src/tendo/tests/test_execfile2.py b/src/tendo/tests/test_execfile2.py index 20e6412..7642f2f 100644 --- a/src/tendo/tests/test_execfile2.py +++ b/src/tendo/tests/test_execfile2.py @@ -1,4 +1,3 @@ - import os import tempfile @@ -48,5 +47,7 @@ def test_raised_exception(): def test_command_line(): exit_code = exec_py_code( - "import sys\nif len(sys.argv)==2 and sys.argv[1]=='doh!': sys.exit(-1)", cmd="doh!") + "import sys\nif len(sys.argv)==2 and sys.argv[1]=='doh!': sys.exit(-1)", + cmd="doh!", + ) assert exit_code == -1 diff --git a/src/tendo/tests/test_singleton.py b/src/tendo/tests/test_singleton.py index 8053925..5b7218f 100644 --- a/src/tendo/tests/test_singleton.py +++ b/src/tendo/tests/test_singleton.py @@ -54,7 +54,7 @@ def test_multiple_singletons_from_process(): def test_singleton_lock_file(): - lockfile = '/tmp/foo.lock' + lockfile = "/tmp/foo.lock" with SingleInstance(lockfile=lockfile) as me: print(me) assert me.lockfile == lockfile diff --git a/src/tendo/tests/test_tee.py b/src/tendo/tests/test_tee.py index f1ad9de..776537c 100644 --- a/src/tendo/tests/test_tee.py +++ b/src/tendo/tests/test_tee.py @@ -1,4 +1,3 @@ - import os from tendo.tee import quote_command, system, system2 @@ -6,26 +5,26 @@ def test_1(): """No CMD os.system() - 1 sort /? ok ok - 2 "sort" /? ok ok - 3 sort "/?" ok ok - 4 "sort" "/?" ok [bad] - 5 ""sort /?"" ok [bad] - 6 "sort /?" [bad] ok - 7 "sort "/?"" [bad] ok - 8 ""sort" "/?"" [bad] ok + 1 sort /? ok ok + 2 "sort" /? ok ok + 3 sort "/?" ok ok + 4 "sort" "/?" ok [bad] + 5 ""sort /?"" ok [bad] + 6 "sort /?" [bad] ok + 7 "sort "/?"" [bad] ok + 8 ""sort" "/?"" [bad] ok """ quotes = { - 'dir >nul': 'dir >nul', + "dir >nul": "dir >nul", 'cd /D "C:\\Program Files\\"': '"cd /D "C:\\Program Files\\""', 'python -c "import os" dummy': '"python -c "import os" dummy"', - 'sort': 'sort', + "sort": "sort", } # we fake the os name because we want to run the test on any platform save = os.name - os.name = 'nt' + os.name = "nt" for key, value in quotes.items(): resulted_value = quote_command(key) @@ -37,12 +36,12 @@ def test_1(): def test_2(): - assert system(['python', '-V']) == 0 + assert system(["python", "-V"]) == 0 def test_3(): - assert system2(['python', '-V'])[0] == 0 + assert system2(["python", "-V"])[0] == 0 def test_4(): - assert system(['python', '-c', "print('c c')"]) == 0 + assert system(["python", "-c", "print('c c')"]) == 0 diff --git a/src/tendo/tests/test_unicode.py b/src/tendo/tests/test_unicode.py index 1935aca..25beec1 100644 --- a/src/tendo/tests/test_unicode.py +++ b/src/tendo/tests/test_unicode.py @@ -34,10 +34,12 @@ def test_write_on_existing_utf8(dir): shutil.copyfile(os.path.join(dir, "assets/utf8.txt"), fname_tmp) f = open(fname_tmp, "a") # encoding not specified, should use utf-8 f.write( - "\u0061\u0062\u0063\u0219\u021B\u005F\u1E69\u0073\u0323\u0307\u0073\u0307\u0323\u005F\u0431\u0434\u0436\u005F\u03B1\u03B2\u03CE\u005F\u0648\u062A\u005F\u05D0\u05E1\u05DC\u005F\u6C38\U0002A6A5\u9EB5\U00020000") + "\u0061\u0062\u0063\u0219\u021B\u005F\u1E69\u0073\u0323\u0307\u0073\u0307\u0323\u005F\u0431\u0434\u0436\u005F\u03B1\u03B2\u03CE\u005F\u0648\u062A\u005F\u05D0\u05E1\u05DC\u005F\u6C38\U0002A6A5\u9EB5\U00020000" + ) f.close() passed = filecmp.cmp( - os.path.join(dir, "assets/utf8-after-append.txt"), fname_tmp, shallow=False) + os.path.join(dir, "assets/utf8-after-append.txt"), fname_tmp, shallow=False + ) assert passed is True os.close(ftmp) os.unlink(fname_tmp) diff --git a/src/tendo/unicode.py b/src/tendo/unicode.py index b8dc780..bbad92d 100755 --- a/src/tendo/unicode.py +++ b/src/tendo/unicode.py @@ -18,7 +18,7 @@ def b(s): return s.encode("latin-1") -def open(filename, mode='r', bufsize=-1, fallback_encoding='utf_8'): +def open(filename, mode="r", bufsize=-1, fallback_encoding="utf_8"): """This replaces Python original function with an improved version that is Unicode aware. The new `open()` does change behaviour only for text files, not binary. @@ -49,24 +49,24 @@ def open(filename, mode='r', bufsize=-1, fallback_encoding='utf_8'): aBuf = bytes(f.read(4)) f.close() except Exception: - aBuf = b('') - if bytes(aBuf[:3]) == b('\xEF\xBB\xBF'): + aBuf = b("") + if bytes(aBuf[:3]) == b("\xEF\xBB\xBF"): f = codecs.open(filename, mode, "utf_8") f.seek(3, 0) f.BOM = codecs.BOM_UTF8 - elif bytes(aBuf[:2]) == b('\xFF\xFE'): + elif bytes(aBuf[:2]) == b("\xFF\xFE"): f = codecs.open(filename, mode, "utf_16_le") f.seek(2, 0) f.BOM = codecs.BOM_UTF16_LE - elif bytes(aBuf[:2]) == b('\xFE\xFF'): + elif bytes(aBuf[:2]) == b("\xFE\xFF"): f = codecs.open(filename, mode, "utf_16_be") f.seek(2, 0) f.BOM = codecs.BOM_UTF16_BE - elif bytes(aBuf[:4]) == b('\xFF\xFE\x00\x00'): + elif bytes(aBuf[:4]) == b("\xFF\xFE\x00\x00"): f = codecs.open(filename, mode, "utf_32_le") f.seek(4, 0) f.BOM = codecs.BOM_UTF32_LE - elif bytes(aBuf[:4]) == b('\x00\x00\xFE\xFF'): + elif bytes(aBuf[:4]) == b("\x00\x00\xFE\xFF"): f = codecs.open(filename, mode, "utf_32_be") f.seek(4, 0) f.BOM = codecs.BOM_UTF32_BE @@ -77,8 +77,11 @@ def open(filename, mode='r', bufsize=-1, fallback_encoding='utf_8'): return f else: import traceback + logging.warning( - "Calling unicode.open(%s,%s,%s) that may be wrong." % (filename, mode, bufsize)) + "Calling unicode.open(%s,%s,%s) that may be wrong." + % (filename, mode, bufsize) + ) traceback.print_exc(file=sys.stderr) return open_old(filename, mode, bufsize)