diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 0914aee97..db3d20329 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -41,6 +41,9 @@ v34.7.2 (unreleased) - Fix an issue with conflicting groups checkbox id in the Add pipeline modal. https://github.com/nexB/scancode.io/issues/1353 +- Move the BasePipeline class to a new `aboutcode.pipeline` module. + https://github.com/nexB/scancode.io/issues/1351 + v34.7.1 (2024-07-15) -------------------- diff --git a/aboutcode/pipeline/__init__.py b/aboutcode/pipeline/__init__.py new file mode 100644 index 000000000..264969ee3 --- /dev/null +++ b/aboutcode/pipeline/__init__.py @@ -0,0 +1,213 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# http://nexb.com and https://github.com/nexB/scancode.io +# The ScanCode.io software is licensed under the Apache License version 2.0. +# Data generated with ScanCode.io is provided as-is without warranties. +# ScanCode is a trademark of nexB Inc. +# +# You may not use this software except in compliance with the License. +# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# +# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, either express or implied. No content created from +# ScanCode.io should be considered or used as legal advice. Consult an Attorney +# for any legal advice. +# +# ScanCode.io is a free software code scanning tool from nexB Inc. and others. +# Visit https://github.com/nexB/scancode.io for support and download. + +import logging +import traceback +from datetime import datetime +from datetime import timezone +from pydoc import getdoc +from pydoc import splitdoc +from timeit import default_timer as timer + +logger = logging.getLogger(__name__) + + +def group(*groups): + """Mark a function as part of a particular group.""" + + def decorator(obj): + if hasattr(obj, "groups"): + obj.groups = obj.groups.union(groups) + else: + setattr(obj, "groups", set(groups)) + return obj + + return decorator + + +def humanize_time(seconds): + """Convert the provided ``seconds`` number into human-readable time.""" + message = f"{seconds:.0f} seconds" + + if seconds > 86400: + message += f" ({seconds / 86400:.1f} days)" + if seconds > 3600: + message += f" ({seconds / 3600:.1f} hours)" + elif seconds > 60: + message += f" ({seconds / 60:.1f} minutes)" + + return message + + +class BasePipeline: + """Base class for all pipeline implementations.""" + + # Flag indicating if the Pipeline is an add-on, meaning it cannot be run first. + is_addon = False + + def __init__(self, run): + """Load the Run and Project instances.""" + self.run = run + self.project = run.project + self.pipeline_name = run.pipeline_name + self.env = self.project.get_env() + + @classmethod + def steps(cls): + raise NotImplementedError + + @classmethod + def get_steps(cls, groups=None): + """ + Return the list of steps defined in the ``steps`` class method. + + If the optional ``groups`` parameter is provided, only include steps labeled + with groups that intersect with the provided list. If a step has no groups or + if ``groups`` is not specified, include the step in the result. + """ + if not callable(cls.steps): + raise TypeError("Use a ``steps(cls)`` classmethod to declare the steps.") + + steps = cls.steps() + + if groups is not None: + steps = tuple( + step + for step in steps + if not getattr(step, "groups", []) + or set(getattr(step, "groups")).intersection(groups) + ) + + return steps + + @classmethod + def get_initial_steps(cls): + """ + Return a tuple of extra initial steps to be run at the start of the pipeline + execution. + """ + return + + @classmethod + def get_doc(cls): + """Get the doc string of this pipeline.""" + return getdoc(cls) + + @classmethod + def get_graph(cls): + """Return a graph of steps.""" + return [ + { + "name": step.__name__, + "doc": getdoc(step), + "groups": getattr(step, "groups", []), + } + for step in cls.get_steps() + ] + + @classmethod + def get_info(cls): + """Get a dictionary of combined information data about this pipeline.""" + summary, description = splitdoc(cls.get_doc()) + steps = cls.get_graph() + + return { + "summary": summary, + "description": description, + "steps": steps, + "available_groups": cls.get_available_groups(), + } + + @classmethod + def get_summary(cls): + """Get the doc string summary.""" + return cls.get_info()["summary"] + + @classmethod + def get_available_groups(cls): + return sorted( + set( + group_name + for step in cls.get_steps() + for group_name in getattr(step, "groups", []) + ) + ) + + def log(self, message): + """Log the given `message` to the current module logger and Run instance.""" + now_local = datetime.now(timezone.utc).astimezone() + timestamp = now_local.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + message = f"{timestamp} {message}" + logger.info(message) + self.run.append_to_log(message) + + @staticmethod + def output_from_exception(exception): + """Return a formatted error message including the traceback.""" + output = f"{exception}\n\n" + + if exception.__cause__ and str(exception.__cause__) != str(exception): + output += f"Cause: {exception.__cause__}\n\n" + + traceback_formatted = "".join(traceback.format_tb(exception.__traceback__)) + output += f"Traceback:\n{traceback_formatted}" + + return output + + def execute(self): + """Execute each steps in the order defined on this pipeline class.""" + self.log(f"Pipeline [{self.pipeline_name}] starting") + + steps = self.get_steps(groups=self.run.selected_groups) + selected_steps = self.run.selected_steps + + if initial_steps := self.get_initial_steps(): + steps = initial_steps + steps + + steps_count = len(steps) + pipeline_start_time = timer() + + for current_index, step in enumerate(steps, start=1): + step_name = step.__name__ + + if selected_steps and step_name not in selected_steps: + self.log(f"Step [{step_name}] skipped") + continue + + self.run.set_current_step(f"{current_index}/{steps_count} {step_name}") + self.log(f"Step [{step_name}] starting") + step_start_time = timer() + + try: + step(self) + except Exception as exception: + self.log("Pipeline failed") + return 1, self.output_from_exception(exception) + + step_run_time = timer() - step_start_time + self.log(f"Step [{step_name}] completed in {humanize_time(step_run_time)}") + + self.run.set_current_step("") # Reset the `current_step` field on completion + pipeline_run_time = timer() - pipeline_start_time + self.log(f"Pipeline completed in {humanize_time(pipeline_run_time)}") + + return 0, "" diff --git a/scanpipe/pipelines/__init__.py b/scanpipe/pipelines/__init__.py index 44b20cf8b..12f1b6ae8 100644 --- a/scanpipe/pipelines/__init__.py +++ b/scanpipe/pipelines/__init__.py @@ -26,17 +26,12 @@ from contextlib import contextmanager from functools import wraps from pathlib import Path -from pydoc import getdoc -from pydoc import splitdoc -from timeit import default_timer as timer - -from django.utils import timezone import bleach from markdown_it import MarkdownIt from pyinstrument import Profiler -from scanpipe import humanize_time +from aboutcode.pipeline import BasePipeline logger = logging.getLogger(__name__) @@ -55,19 +50,6 @@ def _generate_message(self): return message -def group(*groups): - """Mark a function as part of a particular group.""" - - def decorator(obj): - if hasattr(obj, "groups"): - obj.groups = obj.groups.union(groups) - else: - setattr(obj, "groups", set(groups)) - return obj - - return decorator - - def convert_markdown_to_html(markdown_text): """Convert Markdown text to sanitized HTML.""" # Using the "js-default" for safety. @@ -77,13 +59,11 @@ def convert_markdown_to_html(markdown_text): return sanitized_html -class BasePipeline: - """Base class for all pipelines.""" +class Pipeline(BasePipeline): + """Main class for all Project pipelines including common step methods.""" # Flag specifying whether to download missing inputs as an initial step. download_inputs = True - # Flag indicating if the Pipeline is an add-on, meaning it cannot be run first. - is_addon = False # Optional URL that targets a view of the results relative to this Pipeline. # This URL may contain dictionary-style string formatting, which will be # interpolated against the project's field attributes. @@ -91,151 +71,24 @@ class BasePipeline: # to target the Package list view with an active filtering. results_url = "" - def __init__(self, run): - """Load the Run and Project instances.""" - self.run = run - self.project = run.project - self.pipeline_name = run.pipeline_name - self.env = self.project.get_env() - - @classmethod - def steps(cls): - raise NotImplementedError - - @classmethod - def get_steps(cls, groups=None): - """ - Return the list of steps defined in the ``steps`` class method. - - If the optional ``groups`` parameter is provided, only include steps labeled - with groups that intersect with the provided list. If a step has no groups or - if ``groups`` is not specified, include the step in the result. - """ - if not callable(cls.steps): - raise TypeError("Use a ``steps(cls)`` classmethod to declare the steps.") - - steps = cls.steps() - - if groups is not None: - steps = tuple( - step - for step in steps - if not getattr(step, "groups", []) - or set(getattr(step, "groups")).intersection(groups) - ) - - return steps - - @classmethod - def get_doc(cls): - """Get the doc string of this pipeline.""" - return getdoc(cls) - @classmethod - def get_graph(cls): - """Return a graph of steps.""" - return [ - { - "name": step.__name__, - "doc": getdoc(step), - "groups": getattr(step, "groups", []), - } - for step in cls.get_steps() - ] + def get_initial_steps(cls): + """Add the ``download_inputs`` step as an initial step if enabled.""" + if cls.download_inputs: + return (cls.download_missing_inputs,) @classmethod def get_info(cls, as_html=False): - """Get a dictionary of combined information data about this pipeline.""" - summary, description = splitdoc(cls.get_doc()) - steps = cls.get_graph() + """Add the option to render the values as HTML.""" + info = super().get_info() if as_html: - summary = convert_markdown_to_html(summary) - description = convert_markdown_to_html(description) - for step in steps: + info["summary"] = convert_markdown_to_html(info["summary"]) + info["description"] = convert_markdown_to_html(info["description"]) + for step in info["steps"]: step["doc"] = convert_markdown_to_html(step["doc"]) - return { - "summary": summary, - "description": description, - "steps": steps, - "available_groups": cls.get_available_groups(), - } - - @classmethod - def get_summary(cls): - """Get the doc string summary.""" - return cls.get_info()["summary"] - - @classmethod - def get_available_groups(cls): - return sorted( - set( - group_name - for step in cls.get_steps() - for group_name in getattr(step, "groups", []) - ) - ) - - def log(self, message): - """Log the given `message` to the current module logger and Run instance.""" - now_as_localtime = timezone.localtime(timezone.now()) - timestamp = now_as_localtime.strftime("%Y-%m-%d %H:%M:%S.%f")[:-4] - message = f"{timestamp} {message}" - logger.info(message) - self.run.append_to_log(message) - - @staticmethod - def output_from_exception(exception): - """Return a formatted error message including the traceback.""" - output = f"{exception}\n\n" - - if exception.__cause__ and str(exception.__cause__) != str(exception): - output += f"Cause: {exception.__cause__}\n\n" - - traceback_formatted = "".join(traceback.format_tb(exception.__traceback__)) - output += f"Traceback:\n{traceback_formatted}" - - return output - - def execute(self): - """Execute each steps in the order defined on this pipeline class.""" - self.log(f"Pipeline [{self.pipeline_name}] starting") - - steps = self.get_steps(groups=self.run.selected_groups) - selected_steps = self.run.selected_steps - - if self.download_inputs: - steps = (self.__class__.download_missing_inputs,) + steps - - steps_count = len(steps) - pipeline_start_time = timer() - - for current_index, step in enumerate(steps, start=1): - step_name = step.__name__ - - if selected_steps and step_name not in selected_steps: - self.log(f"Step [{step_name}] skipped") - continue - - self.run.set_current_step(f"{current_index}/{steps_count} {step_name}") - self.log(f"Step [{step_name}] starting") - step_start_time = timer() - - try: - step(self) - except Exception as exception: - self.log("Pipeline failed") - return 1, self.output_from_exception(exception) - - step_run_time = timer() - step_start_time - self.log(f"Step [{step_name}] completed in {humanize_time(step_run_time)}") - - self.run.set_current_step("") # Reset the `current_step` field on completion - pipeline_run_time = timer() - pipeline_start_time - self.log(f"Pipeline completed in {humanize_time(pipeline_run_time)}") - - return 0, "" + return info def download_missing_inputs(self): """ @@ -296,10 +149,6 @@ def save_errors(self, *exceptions, **kwargs): except exceptions as error: self.add_error(exception=error, **kwargs) - -class Pipeline(BasePipeline): - """Main class for all pipelines including common step methods.""" - def flag_empty_files(self): """Flag empty files.""" from scanpipe.pipes import flag diff --git a/scanpipe/pipelines/deploy_to_develop.py b/scanpipe/pipelines/deploy_to_develop.py index db39fea3a..e7fa8c6bf 100644 --- a/scanpipe/pipelines/deploy_to_develop.py +++ b/scanpipe/pipelines/deploy_to_develop.py @@ -20,9 +20,9 @@ # ScanCode.io is a free software code scanning tool from nexB Inc. and others. # Visit https://github.com/nexB/scancode.io for support and download. +from aboutcode.pipeline import group from scanpipe import pipes from scanpipe.pipelines import Pipeline -from scanpipe.pipelines import group from scanpipe.pipes import d2d from scanpipe.pipes import flag from scanpipe.pipes import matchcode diff --git a/scanpipe/pipelines/inspect_packages.py b/scanpipe/pipelines/inspect_packages.py index e89d9fdea..1ea78072f 100644 --- a/scanpipe/pipelines/inspect_packages.py +++ b/scanpipe/pipelines/inspect_packages.py @@ -20,7 +20,7 @@ # ScanCode.io is a free software code scanning tool from nexB Inc. and others. # Visit https://github.com/nexB/scancode.io for support and download. -from scanpipe.pipelines import group +from aboutcode.pipeline import group from scanpipe.pipelines.scan_codebase import ScanCodebase from scanpipe.pipes import scancode diff --git a/scanpipe/pipelines/resolve_dependencies.py b/scanpipe/pipelines/resolve_dependencies.py index 706558771..30c6468b9 100644 --- a/scanpipe/pipelines/resolve_dependencies.py +++ b/scanpipe/pipelines/resolve_dependencies.py @@ -20,7 +20,7 @@ # ScanCode.io is a free software code scanning tool from nexB Inc. and others. # Visit https://github.com/nexB/scancode.io for support and download. -from scanpipe.pipelines import group +from aboutcode.pipeline import group from scanpipe.pipelines.scan_codebase import ScanCodebase from scanpipe.pipes import resolve from scanpipe.pipes import scancode diff --git a/scanpipe/tests/pipelines/profile_step.py b/scanpipe/tests/pipelines/profile_step.py index 42135677b..06022d1e1 100644 --- a/scanpipe/tests/pipelines/profile_step.py +++ b/scanpipe/tests/pipelines/profile_step.py @@ -27,6 +27,8 @@ class ProfileStep(Pipeline): """Profile a step using the @profile decorator.""" + download_inputs = False + @classmethod def steps(cls): return (cls.step,) diff --git a/scanpipe/tests/pipelines/with_groups.py b/scanpipe/tests/pipelines/with_groups.py index 1b00e3f16..1a955ea6b 100644 --- a/scanpipe/tests/pipelines/with_groups.py +++ b/scanpipe/tests/pipelines/with_groups.py @@ -20,8 +20,8 @@ # ScanCode.io is a free software code scanning tool from nexB Inc. and others. # Visit https://github.com/nexB/scancode.io for support and download. +from aboutcode.pipeline import group from scanpipe.pipelines import Pipeline -from scanpipe.pipelines import group class WithGroups(Pipeline): diff --git a/scanpipe/tests/test_pipelines.py b/scanpipe/tests/test_pipelines.py index 1d61407b9..e69591f8a 100644 --- a/scanpipe/tests/test_pipelines.py +++ b/scanpipe/tests/test_pipelines.py @@ -179,9 +179,9 @@ def test_scanpipe_pipeline_class_download_inputs_attribute(self): pipeline.execute() self.assertIn("Step [download_missing_inputs]", run.log) - run = project1.add_pipeline("do_nothing") + run = project1.add_pipeline("profile_step") pipeline = run.make_pipeline_instance() - pipeline.download_inputs = False + self.assertFalse(pipeline.download_inputs) pipeline.execute() self.assertNotIn("Step [download_missing_inputs]", run.log)