From ac54f98beb8ff6ffebcb5a3eb91b38856e524ceb Mon Sep 17 00:00:00 2001 From: Keshav Priyadarshi Date: Tue, 6 Aug 2024 13:03:28 +0530 Subject: [PATCH] Make pipeline work Signed-off-by: Keshav Priyadarshi --- pipeline/__init__.py | 97 ++++++++++++++++--- scanpipe/models.py | 2 +- scanpipe/pipelines/__init__.py | 25 +++-- scanpipe/pipes/__init__.py | 74 -------------- scanpipe/pipes/d2d.py | 2 +- scanpipe/pipes/purldb.py | 2 +- scanpipe/pipes/scancode.py | 3 +- scanpipe/pipes/strings.py | 2 +- scanpipe/pipes/symbols.py | 2 +- scanpipe/tests/pipelines/do_nothing.py | 2 + scanpipe/tests/pipelines/profile_step.py | 2 + scanpipe/tests/pipelines/raise_exception.py | 2 + .../tests/pipelines/register_from_file.py | 2 + .../tests/pipelines/steps_as_attribute.py | 2 + scanpipe/tests/pipelines/with_groups.py | 2 + scanpipe/tests/pipes/test_pipes.py | 5 +- scanpipe/tests/test_pipelines.py | 5 +- 17 files changed, 118 insertions(+), 113 deletions(-) diff --git a/pipeline/__init__.py b/pipeline/__init__.py index ebe3508b8..be398dbce 100644 --- a/pipeline/__init__.py +++ b/pipeline/__init__.py @@ -41,19 +41,19 @@ from pipeline import BasePipeline from pipeline import BasePipelineRun -class DoSomething(BasePipeline): +class DoSomething(BasePipeline, BasePipelineRun): @classmethod def steps(cls): return (cls.step1,) def step1(self): print("Message from step1") -# 1. From the Pipeline class (preferred) -run = DoSomething.make_run() +# 1. Run pipeline +run = DoSomething() run.execute() -# 2. From the Run class -run = BasePipelineRun(pipeline_class=DoSomething) +# 2. Run pipeline with selected groups +run = BasePipelineRun(selected_groups=["group1", "group2"]) run.execute() """ @@ -94,13 +94,85 @@ def humanize_time(seconds): return message +class LoopProgress: + """ + A context manager for logging progress in loops. + + Usage:: + + total_iterations = 100 + logger = print # Replace with your actual logger function + + progress = LoopProgress(total_iterations, logger, progress_step=10) + for item in progress.iter(iterator): + "Your processing logic here" + + with LoopProgress(total_iterations, logger, progress_step=10) as progress: + for item in progress.iter(iterator): + "Your processing logic here" + """ + + def __init__(self, total_iterations, logger, progress_step=10): + self.total_iterations = total_iterations + self.logger = logger + self.progress_step = progress_step + self.start_time = timer() + self.last_logged_progress = 0 + self.current_iteration = 0 + + def get_eta(self, current_progress): + run_time = timer() - self.start_time + return round(run_time / current_progress * (100 - current_progress)) + + @property + def current_progress(self): + return int((self.current_iteration / self.total_iterations) * 100) + + @property + def eta(self): + run_time = timer() - self.start_time + return round(run_time / self.current_progress * (100 - self.current_progress)) + + def log_progress(self): + reasons_to_skip = [ + not self.logger, + not self.current_iteration > 0, + self.total_iterations <= self.progress_step, + ] + if any(reasons_to_skip): + return + + if self.current_progress >= self.last_logged_progress + self.progress_step: + msg = ( + f"Progress: {self.current_progress}% " + f"({self.current_iteration}/{self.total_iterations})" + ) + if eta := self.eta: + msg += f" ETA: {humanize_time(eta)}" + + self.logger(msg) + self.last_logged_progress = self.current_progress + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + pass + + def iter(self, iterator): + for item in iterator: + self.current_iteration += 1 + self.log_progress() + yield item + + class BasePipelineRun: """Base class for all pipeline run (execution).""" - def __init__(self, pipeline_class, selected_groups=None, selected_steps=None): + def __init__(self, selected_groups=None, selected_steps=None): """Load the Pipeline class.""" - self.pipeline_class = pipeline_class - self.pipeline_name = pipeline_class.__name__ + self.pipeline_class = self.__class__ + self.pipeline_name = self.pipeline_class.__name__ self.selected_groups = selected_groups self.selected_steps = selected_steps or [] @@ -157,7 +229,7 @@ def execute(self): step_start_time = timer() try: - step(self) # WARNING: self is a Run instance, not a Pipeline instance + step(self) except Exception as exception: self.log("Pipeline failed") return 1, self.output_from_exception(exception) @@ -175,9 +247,6 @@ def execute(self): class BasePipeline: """Base class for all pipeline implementations.""" - # Default PipelineRun class for executing the Pipeline. - run_class = BasePipelineRun - # Flag indicating if the Pipeline is an add-on, meaning it cannot be run first. is_addon = False @@ -259,7 +328,3 @@ def get_available_groups(cls): for group_name in getattr(step, "groups", []) ) ) - - @classmethod - def make_run(cls, *args, **kwargs): - return cls.run_class(cls, *args, **kwargs) diff --git a/scanpipe/models.py b/scanpipe/models.py index ca18535e0..d74a93ee0 100644 --- a/scanpipe/models.py +++ b/scanpipe/models.py @@ -1975,7 +1975,7 @@ def pipeline_class(self): def make_pipeline_instance(self): """Return a pipelines instance using this Run pipeline_class.""" - return self.pipeline_class.make_run(run_instance=self) + return self.pipeline_class(run_instance=self) def deliver_project_subscriptions(self): """Triggers related project webhook subscriptions.""" diff --git a/scanpipe/pipelines/__init__.py b/scanpipe/pipelines/__init__.py index 3c7dc87ad..61ca984aa 100644 --- a/scanpipe/pipelines/__init__.py +++ b/scanpipe/pipelines/__init__.py @@ -111,7 +111,7 @@ def extract_archives(self, location=None): class ProjectPipelineRun(BasePipelineRun): - def __init__(self, pipeline_class, run_instance): + def __init__(self, run_instance): """Load the Pipeline execution context from a Run database object.""" self.run = run_instance self.project = run_instance.project @@ -130,11 +130,11 @@ def set_current_step(self, message): self.run.set_current_step(message) -class Pipeline(CommonStepsMixin, BasePipeline): +class Pipeline(CommonStepsMixin, BasePipeline, ProjectPipelineRun): """Main class for all project related pipelines including common steps methods.""" # Project wrapper ProjectPipelineRun class - run_class = ProjectPipelineRun + # run_class = ProjectPipelineRun # Flag specifying whether to download missing inputs as an initial step. download_inputs = True @@ -146,16 +146,15 @@ class Pipeline(CommonStepsMixin, BasePipeline): # to target the Package list view with an active filtering. results_url = "" - # TODO - # @classmethod - # def get_steps(cls, groups=None): - # """Inject the ``download_inputs`` step if enabled.""" - # steps = super().get_steps(groups) - # - # if cls.download_inputs: - # steps = (cls.download_missing_inputs,) + steps - # - # return steps + @classmethod + def get_steps(cls, groups=None): + """Inject the ``download_inputs`` step if enabled.""" + steps = super().get_steps(groups) + + if cls.download_inputs: + steps = (cls.download_missing_inputs,) + steps + + return steps def download_missing_inputs(self): """ diff --git a/scanpipe/pipes/__init__.py b/scanpipe/pipes/__init__.py index 1f52270df..7dd4b1b9c 100644 --- a/scanpipe/pipes/__init__.py +++ b/scanpipe/pipes/__init__.py @@ -29,11 +29,9 @@ from datetime import datetime from itertools import islice from pathlib import Path -from timeit import default_timer as timer from django.db.models import Count -from scanpipe import humanize_time from scanpipe.models import AbstractTaskFieldsModel from scanpipe.models import CodebaseRelation from scanpipe.models import CodebaseResource @@ -402,78 +400,6 @@ def get_bin_executable(filename): return str(Path(sys.executable).parent / filename) -class LoopProgress: - """ - A context manager for logging progress in loops. - - Usage:: - - total_iterations = 100 - logger = print # Replace with your actual logger function - - progress = LoopProgress(total_iterations, logger, progress_step=10) - for item in progress.iter(iterator): - "Your processing logic here" - - with LoopProgress(total_iterations, logger, progress_step=10) as progress: - for item in progress.iter(iterator): - "Your processing logic here" - """ - - def __init__(self, total_iterations, logger, progress_step=10): - self.total_iterations = total_iterations - self.logger = logger - self.progress_step = progress_step - self.start_time = timer() - self.last_logged_progress = 0 - self.current_iteration = 0 - - def get_eta(self, current_progress): - run_time = timer() - self.start_time - return round(run_time / current_progress * (100 - current_progress)) - - @property - def current_progress(self): - return int((self.current_iteration / self.total_iterations) * 100) - - @property - def eta(self): - run_time = timer() - self.start_time - return round(run_time / self.current_progress * (100 - self.current_progress)) - - def log_progress(self): - reasons_to_skip = [ - not self.logger, - not self.current_iteration > 0, - self.total_iterations <= self.progress_step, - ] - if any(reasons_to_skip): - return - - if self.current_progress >= self.last_logged_progress + self.progress_step: - msg = ( - f"Progress: {self.current_progress}% " - f"({self.current_iteration}/{self.total_iterations})" - ) - if eta := self.eta: - msg += f" ETA: {humanize_time(eta)}" - - self.logger(msg) - self.last_logged_progress = self.current_progress - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - pass - - def iter(self, iterator): - for item in iterator: - self.current_iteration += 1 - self.log_progress() - yield item - - def get_text_str_diff_ratio(str_a, str_b): """ Return a similarity ratio as a float between 0 and 1 by comparing the diff --git a/scanpipe/pipes/d2d.py b/scanpipe/pipes/d2d.py index e65690997..e2792e2b8 100644 --- a/scanpipe/pipes/d2d.py +++ b/scanpipe/pipes/d2d.py @@ -44,11 +44,11 @@ from packagedcode.npm import NpmPackageJsonHandler from summarycode.classify import LEGAL_STARTS_ENDS +from pipeline import LoopProgress from scanpipe import pipes from scanpipe.models import CodebaseRelation from scanpipe.models import CodebaseResource from scanpipe.models import convert_glob_to_django_regex -from scanpipe.pipes import LoopProgress from scanpipe.pipes import flag from scanpipe.pipes import get_resource_diff_ratio from scanpipe.pipes import js diff --git a/scanpipe/pipes/purldb.py b/scanpipe/pipes/purldb.py index 5ed1962ff..677a9be65 100644 --- a/scanpipe/pipes/purldb.py +++ b/scanpipe/pipes/purldb.py @@ -32,7 +32,7 @@ from univers.version_range import RANGE_CLASS_BY_SCHEMES from univers.version_range import InvalidVersionRange -from scanpipe.pipes import LoopProgress +from pipeline import LoopProgress from scanpipe.pipes import _clean_package_data from scanpipe.pipes import poll_until_success diff --git a/scanpipe/pipes/scancode.py b/scanpipe/pipes/scancode.py index 9cb708284..d9e93b6f3 100644 --- a/scanpipe/pipes/scancode.py +++ b/scanpipe/pipes/scancode.py @@ -45,6 +45,7 @@ from scancode import cli as scancode_cli from scancode.cli import run_scan as scancode_run_scan +from pipeline import LoopProgress from scanpipe import pipes from scanpipe.models import CodebaseResource from scanpipe.models import DiscoveredDependency @@ -308,7 +309,7 @@ def scan_resources( resource_count = resource_qs.count() logger.info(f"Scan {resource_count} codebase resources with {scan_func.__name__}") resource_iterator = resource_qs.iterator(chunk_size=2000) - progress = pipes.LoopProgress(resource_count, logger=progress_logger) + progress = LoopProgress(resource_count, logger=progress_logger) max_workers = get_max_workers(keep_available=1) if max_workers <= 0: diff --git a/scanpipe/pipes/strings.py b/scanpipe/pipes/strings.py index 2d58c616e..7da0cd762 100644 --- a/scanpipe/pipes/strings.py +++ b/scanpipe/pipes/strings.py @@ -20,7 +20,7 @@ # ScanCode.io is a free software code scanning tool from nexB Inc. and others. # Visit https://github.com/nexB/scancode.io for support and download. -from scanpipe.pipes import LoopProgress +from pipeline import LoopProgress class XgettextNotFound(Exception): diff --git a/scanpipe/pipes/symbols.py b/scanpipe/pipes/symbols.py index 4ef2724f6..32970eca0 100644 --- a/scanpipe/pipes/symbols.py +++ b/scanpipe/pipes/symbols.py @@ -22,7 +22,7 @@ from django.db.models import Q -from scanpipe.pipes import LoopProgress +from pipeline import LoopProgress class UniversalCtagsNotFound(Exception): diff --git a/scanpipe/tests/pipelines/do_nothing.py b/scanpipe/tests/pipelines/do_nothing.py index 01d8c8f91..91ed203ce 100644 --- a/scanpipe/tests/pipelines/do_nothing.py +++ b/scanpipe/tests/pipelines/do_nothing.py @@ -30,6 +30,8 @@ class DoNothing(Pipeline): Description section of the doc string. """ + download_inputs = False + @classmethod def steps(cls): return ( diff --git a/scanpipe/tests/pipelines/profile_step.py b/scanpipe/tests/pipelines/profile_step.py index 42135677b..06022d1e1 100644 --- a/scanpipe/tests/pipelines/profile_step.py +++ b/scanpipe/tests/pipelines/profile_step.py @@ -27,6 +27,8 @@ class ProfileStep(Pipeline): """Profile a step using the @profile decorator.""" + download_inputs = False + @classmethod def steps(cls): return (cls.step,) diff --git a/scanpipe/tests/pipelines/raise_exception.py b/scanpipe/tests/pipelines/raise_exception.py index b9a71c656..75cdd425c 100644 --- a/scanpipe/tests/pipelines/raise_exception.py +++ b/scanpipe/tests/pipelines/raise_exception.py @@ -26,6 +26,8 @@ class RaiseException(Pipeline): """Raise an Exception.""" + download_inputs = False + @classmethod def steps(cls): return (cls.raise_exception_step,) diff --git a/scanpipe/tests/pipelines/register_from_file.py b/scanpipe/tests/pipelines/register_from_file.py index c07914fd3..6dd84420e 100644 --- a/scanpipe/tests/pipelines/register_from_file.py +++ b/scanpipe/tests/pipelines/register_from_file.py @@ -26,6 +26,8 @@ class RegisterFromFile(DoNothing): """Register from its file path.""" + download_inputs = False + @classmethod def steps(cls): return (cls.step1,) diff --git a/scanpipe/tests/pipelines/steps_as_attribute.py b/scanpipe/tests/pipelines/steps_as_attribute.py index b9853dce2..2755c5e58 100644 --- a/scanpipe/tests/pipelines/steps_as_attribute.py +++ b/scanpipe/tests/pipelines/steps_as_attribute.py @@ -26,6 +26,8 @@ class StepsAsAttribute(Pipeline): """Declare steps as attribute.""" + download_inputs = False + def step1(self): return diff --git a/scanpipe/tests/pipelines/with_groups.py b/scanpipe/tests/pipelines/with_groups.py index 878bc942b..32b4f471d 100644 --- a/scanpipe/tests/pipelines/with_groups.py +++ b/scanpipe/tests/pipelines/with_groups.py @@ -27,6 +27,8 @@ class WithGroups(Pipeline): """Include "grouped" steps.""" + download_inputs = False + @classmethod def steps(cls): return ( diff --git a/scanpipe/tests/pipes/test_pipes.py b/scanpipe/tests/pipes/test_pipes.py index 5af6d4518..d59a1dc7f 100644 --- a/scanpipe/tests/pipes/test_pipes.py +++ b/scanpipe/tests/pipes/test_pipes.py @@ -28,6 +28,7 @@ from django.test import TestCase from django.test import TransactionTestCase +from pipeline import LoopProgress from scanpipe import pipes from scanpipe.models import CodebaseResource from scanpipe.models import DiscoveredPackage @@ -348,14 +349,14 @@ def test_scanpipe_loop_progress_as_context_manager(self): buffer = io.StringIO() logger = buffer.write - progress = pipes.LoopProgress(total_iterations, logger, progress_step=10) + progress = LoopProgress(total_iterations, logger, progress_step=10) for _ in progress.iter(range(total_iterations)): pass self.assertEqual(expected, buffer.getvalue()) buffer = io.StringIO() logger = buffer.write - with pipes.LoopProgress(total_iterations, logger, progress_step) as progress: + with LoopProgress(total_iterations, logger, progress_step) as progress: for _ in progress.iter(range(total_iterations)): pass self.assertEqual(expected, buffer.getvalue()) diff --git a/scanpipe/tests/test_pipelines.py b/scanpipe/tests/test_pipelines.py index aef621ece..5fba6104a 100644 --- a/scanpipe/tests/test_pipelines.py +++ b/scanpipe/tests/test_pipelines.py @@ -152,10 +152,10 @@ def test_scanpipe_pipeline_class_execute_with_selected_steps(self, step2, step1) project1 = Project.objects.create(name="Analysis") run = project1.add_pipeline("do_nothing") - pipeline = run.make_pipeline_instance() run.selected_steps = ["step2", "not_existing_step"] run.save() + pipeline = run.make_pipeline_instance() exitcode, out = pipeline.execute() self.assertEqual(0, exitcode) @@ -174,6 +174,7 @@ def test_scanpipe_pipeline_class_execute_with_selected_steps(self, step2, step1) def test_scanpipe_pipeline_class_download_inputs_attribute(self): project1 = Project.objects.create(name="Analysis") run = project1.add_pipeline("do_nothing") + run.pipeline_class.download_inputs = True pipeline = run.make_pipeline_instance() self.assertTrue(pipeline.download_inputs) pipeline.execute() @@ -181,7 +182,7 @@ def test_scanpipe_pipeline_class_download_inputs_attribute(self): run = project1.add_pipeline("do_nothing") pipeline = run.make_pipeline_instance() - pipeline.download_inputs = False + run.pipeline_class.download_inputs = False pipeline.execute() self.assertNotIn("Step [download_missing_inputs]", run.log)