Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move the BasePipeline class to a new aboutcode.pipeline module #1351 #1357

Merged
merged 2 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ v34.7.2 (unreleased)
- Fix an issue with conflicting groups checkbox id in the Add pipeline modal.
https://github.com/nexB/scancode.io/issues/1353

- Move the BasePipeline class to a new `aboutcode.pipeline` module.
https://github.com/nexB/scancode.io/issues/1351

v34.7.1 (2024-07-15)
--------------------

Expand Down
213 changes: 213 additions & 0 deletions aboutcode/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
# SPDX-License-Identifier: Apache-2.0
#
# http://nexb.com and https://github.com/nexB/scancode.io
# The ScanCode.io software is licensed under the Apache License version 2.0.
# Data generated with ScanCode.io is provided as-is without warranties.
# ScanCode is a trademark of nexB Inc.
#
# You may not use this software except in compliance with the License.
# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. No content created from
# ScanCode.io should be considered or used as legal advice. Consult an Attorney
# for any legal advice.
#
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
# Visit https://github.com/nexB/scancode.io for support and download.

import logging
import traceback
from datetime import datetime
from datetime import timezone
from pydoc import getdoc
from pydoc import splitdoc
from timeit import default_timer as timer

logger = logging.getLogger(__name__)


def group(*groups):
"""Mark a function as part of a particular group."""

def decorator(obj):
if hasattr(obj, "groups"):
obj.groups = obj.groups.union(groups)
else:
setattr(obj, "groups", set(groups))
return obj

return decorator


def humanize_time(seconds):
"""Convert the provided ``seconds`` number into human-readable time."""
message = f"{seconds:.0f} seconds"

if seconds > 86400:
message += f" ({seconds / 86400:.1f} days)"
if seconds > 3600:
message += f" ({seconds / 3600:.1f} hours)"
elif seconds > 60:
message += f" ({seconds / 60:.1f} minutes)"

return message


class BasePipeline:
"""Base class for all pipeline implementations."""

# Flag indicating if the Pipeline is an add-on, meaning it cannot be run first.
is_addon = False

def __init__(self, run):
"""Load the Run and Project instances."""
self.run = run
self.project = run.project
self.pipeline_name = run.pipeline_name
self.env = self.project.get_env()

@classmethod
def steps(cls):
raise NotImplementedError

@classmethod
def get_steps(cls, groups=None):
"""
Return the list of steps defined in the ``steps`` class method.
If the optional ``groups`` parameter is provided, only include steps labeled
with groups that intersect with the provided list. If a step has no groups or
if ``groups`` is not specified, include the step in the result.
"""
if not callable(cls.steps):
raise TypeError("Use a ``steps(cls)`` classmethod to declare the steps.")

steps = cls.steps()

if groups is not None:
steps = tuple(
step
for step in steps
if not getattr(step, "groups", [])
or set(getattr(step, "groups")).intersection(groups)
)

return steps

@classmethod
def get_initial_steps(cls):
"""
Return a tuple of extra initial steps to be run at the start of the pipeline
execution.
"""
return

@classmethod
def get_doc(cls):
"""Get the doc string of this pipeline."""
return getdoc(cls)

@classmethod
def get_graph(cls):
"""Return a graph of steps."""
return [
{
"name": step.__name__,
"doc": getdoc(step),
"groups": getattr(step, "groups", []),
}
for step in cls.get_steps()
]

@classmethod
def get_info(cls):
"""Get a dictionary of combined information data about this pipeline."""
summary, description = splitdoc(cls.get_doc())
steps = cls.get_graph()

return {
"summary": summary,
"description": description,
"steps": steps,
"available_groups": cls.get_available_groups(),
}

@classmethod
def get_summary(cls):
"""Get the doc string summary."""
return cls.get_info()["summary"]

@classmethod
def get_available_groups(cls):
return sorted(
set(
group_name
for step in cls.get_steps()
for group_name in getattr(step, "groups", [])
)
)

def log(self, message):
"""Log the given `message` to the current module logger and Run instance."""
now_local = datetime.now(timezone.utc).astimezone()
timestamp = now_local.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
message = f"{timestamp} {message}"
logger.info(message)
self.run.append_to_log(message)

@staticmethod
def output_from_exception(exception):
"""Return a formatted error message including the traceback."""
output = f"{exception}\n\n"

if exception.__cause__ and str(exception.__cause__) != str(exception):
output += f"Cause: {exception.__cause__}\n\n"

traceback_formatted = "".join(traceback.format_tb(exception.__traceback__))
output += f"Traceback:\n{traceback_formatted}"

return output

def execute(self):
"""Execute each steps in the order defined on this pipeline class."""
self.log(f"Pipeline [{self.pipeline_name}] starting")

steps = self.get_steps(groups=self.run.selected_groups)
selected_steps = self.run.selected_steps

if initial_steps := self.get_initial_steps():
steps = initial_steps + steps

steps_count = len(steps)
pipeline_start_time = timer()

for current_index, step in enumerate(steps, start=1):
step_name = step.__name__

if selected_steps and step_name not in selected_steps:
self.log(f"Step [{step_name}] skipped")
continue

self.run.set_current_step(f"{current_index}/{steps_count} {step_name}")
self.log(f"Step [{step_name}] starting")
step_start_time = timer()

try:
step(self)
except Exception as exception:
self.log("Pipeline failed")
return 1, self.output_from_exception(exception)

step_run_time = timer() - step_start_time
self.log(f"Step [{step_name}] completed in {humanize_time(step_run_time)}")

self.run.set_current_step("") # Reset the `current_step` field on completion
pipeline_run_time = timer() - pipeline_start_time
self.log(f"Pipeline completed in {humanize_time(pipeline_run_time)}")

return 0, ""
2 changes: 2 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
# documentation root, use os.path.abspath to make it absolute, like shown here.

import os
import sys

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "scancodeio.settings")
sys.path.insert(0, os.path.abspath("../."))

# -- Project information -----------------------------------------------------

Expand Down
Loading