Skip to content

Commit

Permalink
Make pipeline work
Browse files Browse the repository at this point in the history
Signed-off-by: Keshav Priyadarshi <[email protected]>
  • Loading branch information
keshav-space committed Aug 6, 2024
1 parent 6f5cf02 commit 104d784
Show file tree
Hide file tree
Showing 17 changed files with 118 additions and 113 deletions.
97 changes: 81 additions & 16 deletions pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@
from pipeline import BasePipeline
from pipeline import BasePipelineRun
class DoSomething(BasePipeline):
class DoSomething(BasePipeline, BasePipelineRun):
@classmethod
def steps(cls):
return (cls.step1,)
def step1(self):
print("Message from step1")
# 1. From the Pipeline class (preferred)
run = DoSomething.make_run()
# 1. Run pipeline
run = DoSomething()
run.execute()
# 2. From the Run class
run = BasePipelineRun(pipeline_class=DoSomething)
# 2. Run pipeline with selected groups
run = BasePipelineRun(selected_groups=["group1", "group2"])
run.execute()
"""

Expand Down Expand Up @@ -94,13 +94,85 @@ def humanize_time(seconds):
return message


class LoopProgress:
"""
A context manager for logging progress in loops.
Usage::
total_iterations = 100
logger = print # Replace with your actual logger function
progress = LoopProgress(total_iterations, logger, progress_step=10)
for item in progress.iter(iterator):
"Your processing logic here"
with LoopProgress(total_iterations, logger, progress_step=10) as progress:
for item in progress.iter(iterator):
"Your processing logic here"
"""

def __init__(self, total_iterations, logger, progress_step=10):
self.total_iterations = total_iterations
self.logger = logger
self.progress_step = progress_step
self.start_time = timer()
self.last_logged_progress = 0
self.current_iteration = 0

def get_eta(self, current_progress):
run_time = timer() - self.start_time
return round(run_time / current_progress * (100 - current_progress))

@property
def current_progress(self):
return int((self.current_iteration / self.total_iterations) * 100)

@property
def eta(self):
run_time = timer() - self.start_time
return round(run_time / self.current_progress * (100 - self.current_progress))

def log_progress(self):
reasons_to_skip = [
not self.logger,
not self.current_iteration > 0,
self.total_iterations <= self.progress_step,
]
if any(reasons_to_skip):
return

if self.current_progress >= self.last_logged_progress + self.progress_step:
msg = (
f"Progress: {self.current_progress}% "
f"({self.current_iteration}/{self.total_iterations})"
)
if eta := self.eta:
msg += f" ETA: {humanize_time(eta)}"

self.logger(msg)
self.last_logged_progress = self.current_progress

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
pass

def iter(self, iterator):
for item in iterator:
self.current_iteration += 1
self.log_progress()
yield item


class BasePipelineRun:
"""Base class for all pipeline run (execution)."""

def __init__(self, pipeline_class, selected_groups=None, selected_steps=None):
def __init__(self, selected_groups=None, selected_steps=None):
"""Load the Pipeline class."""
self.pipeline_class = pipeline_class
self.pipeline_name = pipeline_class.__name__
self.pipeline_class = self.__class__
self.pipeline_name = self.pipeline_class.__name__

self.selected_groups = selected_groups
self.selected_steps = selected_steps or []
Expand Down Expand Up @@ -157,7 +229,7 @@ def execute(self):
step_start_time = timer()

try:
step(self) # WARNING: self is a Run instance, not a Pipeline instance
step(self)
except Exception as exception:
self.log("Pipeline failed")
return 1, self.output_from_exception(exception)
Expand All @@ -175,9 +247,6 @@ def execute(self):
class BasePipeline:
"""Base class for all pipeline implementations."""

# Default PipelineRun class for executing the Pipeline.
run_class = BasePipelineRun

# Flag indicating if the Pipeline is an add-on, meaning it cannot be run first.
is_addon = False

Expand Down Expand Up @@ -259,7 +328,3 @@ def get_available_groups(cls):
for group_name in getattr(step, "groups", [])
)
)

@classmethod
def make_run(cls, *args, **kwargs):
return cls.run_class(cls, *args, **kwargs)
2 changes: 1 addition & 1 deletion scanpipe/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1975,7 +1975,7 @@ def pipeline_class(self):

def make_pipeline_instance(self):
"""Return a pipelines instance using this Run pipeline_class."""
return self.pipeline_class.make_run(run_instance=self)
return self.pipeline_class(self)

def deliver_project_subscriptions(self):
"""Triggers related project webhook subscriptions."""
Expand Down
25 changes: 12 additions & 13 deletions scanpipe/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def extract_archives(self, location=None):


class ProjectPipelineRun(BasePipelineRun):
def __init__(self, pipeline_class, run_instance):
def __init__(self, run_instance):
"""Load the Pipeline execution context from a Run database object."""
self.run = run_instance
self.project = run_instance.project
Expand All @@ -130,11 +130,11 @@ def set_current_step(self, message):
self.run.set_current_step(message)


class Pipeline(CommonStepsMixin, BasePipeline):
class Pipeline(CommonStepsMixin, BasePipeline, ProjectPipelineRun):
"""Main class for all project related pipelines including common steps methods."""

# Project wrapper ProjectPipelineRun class
run_class = ProjectPipelineRun
# run_class = ProjectPipelineRun

# Flag specifying whether to download missing inputs as an initial step.
download_inputs = True
Expand All @@ -146,16 +146,15 @@ class Pipeline(CommonStepsMixin, BasePipeline):
# to target the Package list view with an active filtering.
results_url = ""

# TODO
# @classmethod
# def get_steps(cls, groups=None):
# """Inject the ``download_inputs`` step if enabled."""
# steps = super().get_steps(groups)
#
# if cls.download_inputs:
# steps = (cls.download_missing_inputs,) + steps
#
# return steps
@classmethod
def get_steps(cls, groups=None):
"""Inject the ``download_inputs`` step if enabled."""
steps = super().get_steps(groups)

if cls.download_inputs:
steps = (cls.download_missing_inputs,) + steps

return steps

def download_missing_inputs(self):
"""
Expand Down
74 changes: 0 additions & 74 deletions scanpipe/pipes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@
from datetime import datetime
from itertools import islice
from pathlib import Path
from timeit import default_timer as timer

from django.db.models import Count

from scanpipe import humanize_time
from scanpipe.models import AbstractTaskFieldsModel
from scanpipe.models import CodebaseRelation
from scanpipe.models import CodebaseResource
Expand Down Expand Up @@ -402,78 +400,6 @@ def get_bin_executable(filename):
return str(Path(sys.executable).parent / filename)


class LoopProgress:
"""
A context manager for logging progress in loops.
Usage::
total_iterations = 100
logger = print # Replace with your actual logger function
progress = LoopProgress(total_iterations, logger, progress_step=10)
for item in progress.iter(iterator):
"Your processing logic here"
with LoopProgress(total_iterations, logger, progress_step=10) as progress:
for item in progress.iter(iterator):
"Your processing logic here"
"""

def __init__(self, total_iterations, logger, progress_step=10):
self.total_iterations = total_iterations
self.logger = logger
self.progress_step = progress_step
self.start_time = timer()
self.last_logged_progress = 0
self.current_iteration = 0

def get_eta(self, current_progress):
run_time = timer() - self.start_time
return round(run_time / current_progress * (100 - current_progress))

@property
def current_progress(self):
return int((self.current_iteration / self.total_iterations) * 100)

@property
def eta(self):
run_time = timer() - self.start_time
return round(run_time / self.current_progress * (100 - self.current_progress))

def log_progress(self):
reasons_to_skip = [
not self.logger,
not self.current_iteration > 0,
self.total_iterations <= self.progress_step,
]
if any(reasons_to_skip):
return

if self.current_progress >= self.last_logged_progress + self.progress_step:
msg = (
f"Progress: {self.current_progress}% "
f"({self.current_iteration}/{self.total_iterations})"
)
if eta := self.eta:
msg += f" ETA: {humanize_time(eta)}"

self.logger(msg)
self.last_logged_progress = self.current_progress

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
pass

def iter(self, iterator):
for item in iterator:
self.current_iteration += 1
self.log_progress()
yield item


def get_text_str_diff_ratio(str_a, str_b):
"""
Return a similarity ratio as a float between 0 and 1 by comparing the
Expand Down
2 changes: 1 addition & 1 deletion scanpipe/pipes/d2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@
from packagedcode.npm import NpmPackageJsonHandler
from summarycode.classify import LEGAL_STARTS_ENDS

from pipeline import LoopProgress
from scanpipe import pipes
from scanpipe.models import CodebaseRelation
from scanpipe.models import CodebaseResource
from scanpipe.models import convert_glob_to_django_regex
from scanpipe.pipes import LoopProgress
from scanpipe.pipes import flag
from scanpipe.pipes import get_resource_diff_ratio
from scanpipe.pipes import js
Expand Down
2 changes: 1 addition & 1 deletion scanpipe/pipes/purldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from univers.version_range import RANGE_CLASS_BY_SCHEMES
from univers.version_range import InvalidVersionRange

from scanpipe.pipes import LoopProgress
from pipeline import LoopProgress
from scanpipe.pipes import _clean_package_data
from scanpipe.pipes import poll_until_success

Expand Down
3 changes: 2 additions & 1 deletion scanpipe/pipes/scancode.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from scancode import cli as scancode_cli
from scancode.cli import run_scan as scancode_run_scan

from pipeline import LoopProgress
from scanpipe import pipes
from scanpipe.models import CodebaseResource
from scanpipe.models import DiscoveredDependency
Expand Down Expand Up @@ -308,7 +309,7 @@ def scan_resources(
resource_count = resource_qs.count()
logger.info(f"Scan {resource_count} codebase resources with {scan_func.__name__}")
resource_iterator = resource_qs.iterator(chunk_size=2000)
progress = pipes.LoopProgress(resource_count, logger=progress_logger)
progress = LoopProgress(resource_count, logger=progress_logger)
max_workers = get_max_workers(keep_available=1)

if max_workers <= 0:
Expand Down
2 changes: 1 addition & 1 deletion scanpipe/pipes/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
# Visit https://github.com/nexB/scancode.io for support and download.

from scanpipe.pipes import LoopProgress
from pipeline import LoopProgress


class XgettextNotFound(Exception):
Expand Down
2 changes: 1 addition & 1 deletion scanpipe/pipes/symbols.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from django.db.models import Q

from scanpipe.pipes import LoopProgress
from pipeline import LoopProgress


class UniversalCtagsNotFound(Exception):
Expand Down
2 changes: 2 additions & 0 deletions scanpipe/tests/pipelines/do_nothing.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class DoNothing(Pipeline):
Description section of the doc string.
"""

download_inputs = False

@classmethod
def steps(cls):
return (
Expand Down
2 changes: 2 additions & 0 deletions scanpipe/tests/pipelines/profile_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
class ProfileStep(Pipeline):
"""Profile a step using the @profile decorator."""

download_inputs = False

@classmethod
def steps(cls):
return (cls.step,)
Expand Down
2 changes: 2 additions & 0 deletions scanpipe/tests/pipelines/raise_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
class RaiseException(Pipeline):
"""Raise an Exception."""

download_inputs = False

@classmethod
def steps(cls):
return (cls.raise_exception_step,)
Expand Down
2 changes: 2 additions & 0 deletions scanpipe/tests/pipelines/register_from_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
class RegisterFromFile(DoNothing):
"""Register from its file path."""

download_inputs = False

@classmethod
def steps(cls):
return (cls.step1,)
Expand Down
2 changes: 2 additions & 0 deletions scanpipe/tests/pipelines/steps_as_attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
class StepsAsAttribute(Pipeline):
"""Declare steps as attribute."""

download_inputs = False

def step1(self):
return

Expand Down
Loading

0 comments on commit 104d784

Please sign in to comment.