Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the BasePipeline, move out all Project related logic #1351 #1358

Merged
merged 5 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions aboutcode/pipeline/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# `aboutcode.pipeline`

Define and run pipelines.

### Install

```bash
pip install aboutcode_pipeline
Copy link
Member

@keshav-space keshav-space Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per PEP8 https://peps.python.org/pep-0008/#package-and-module-names aboutcode-pipeline would be a much more Pythonic package name.

Actually, going by @pombredanne suggestion here #1332 (comment), we may want to name this package aboutcode.pipeline.

Suggested change
pip install aboutcode_pipeline
pip install aboutcode.pipeline

Copy link
Contributor Author

@tdruez tdruez Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially went with aboutcode_pipeline in the README as the filenames of the flot build were aboutcode_pipeline-0.1.whl, even though the [project] name was set to "aboutcode.pipeline".

```

### Define and execute a pipeline

```python
from aboutcode.pipeline import BasePipeline

class PrintMessages(BasePipeline):
@classmethod
def steps(cls):
return (cls.step1,)

def step1(self):
print("Message from step1")

PrintMessages().execute()
```

### Groups and steps selection

```python
from aboutcode.pipeline import BasePipeline
from aboutcode.pipeline import group

class PrintMessages(BasePipeline):
@classmethod
def steps(cls):
return (cls.step1, cls.step2)

def step1(self):
print("Message from step1")

@group("foo")
def step2(self):
print("Message from step2")


# Execute pipeline with group selection
run = PrintMessages(selected_groups=["foo"])
exitcode, error = run.execute()

# Execute pipeline with steps selection
run = PrintMessages(selected_steps=["step1"])
exitcode, error = run.execute()
```
200 changes: 151 additions & 49 deletions aboutcode/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,49 +28,20 @@
from pydoc import splitdoc
from timeit import default_timer as timer

logger = logging.getLogger(__name__)
module_logger = logging.getLogger(__name__)


def group(*groups):
"""Mark a function as part of a particular group."""

def decorator(obj):
if hasattr(obj, "groups"):
obj.groups = obj.groups.union(groups)
else:
setattr(obj, "groups", set(groups))
return obj

return decorator


def humanize_time(seconds):
"""Convert the provided ``seconds`` number into human-readable time."""
message = f"{seconds:.0f} seconds"

if seconds > 86400:
message += f" ({seconds / 86400:.1f} days)"
if seconds > 3600:
message += f" ({seconds / 3600:.1f} hours)"
elif seconds > 60:
message += f" ({seconds / 60:.1f} minutes)"

return message


class BasePipeline:
"""Base class for all pipeline implementations."""
class PipelineDefinition:
"""
Encapsulate the code related to a Pipeline definition:
- Steps
- Attributes
- Documentation
"""

# Flag indicating if the Pipeline is an add-on, meaning it cannot be run first.
is_addon = False

def __init__(self, run):
"""Load the Run and Project instances."""
self.run = run
self.project = run.project
self.pipeline_name = run.pipeline_name
self.env = self.project.get_env()

@classmethod
def steps(cls):
raise NotImplementedError
Expand All @@ -89,6 +60,9 @@ def get_steps(cls, groups=None):

steps = cls.steps()

if initial_steps := cls.get_initial_steps():
steps = (*initial_steps, *steps)

if groups is not None:
steps = tuple(
step
Expand Down Expand Up @@ -152,13 +126,40 @@ def get_available_groups(cls):
)
)


class PipelineRun:
"""
Encapsulate the code related to a Pipeline run (execution):
- Execution context: groups, steps
- Execution logic
- Logging
- Results
"""

def __init__(self, selected_groups=None, selected_steps=None):
"""Load the Pipeline class."""
self.pipeline_class = self.__class__
self.pipeline_name = self.__class__.__name__

self.selected_groups = selected_groups
self.selected_steps = selected_steps or []

self.execution_log = []
self.current_step = ""

def append_to_log(self, message):
self.execution_log.append(message)

def set_current_step(self, message):
self.current_step = message

def log(self, message):
"""Log the given `message` to the current module logger and Run instance."""
"""Log the given `message` to the current module logger and execution_log."""
now_local = datetime.now(timezone.utc).astimezone()
timestamp = now_local.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
message = f"{timestamp} {message}"
logger.info(message)
self.run.append_to_log(message)
module_logger.info(message)
self.append_to_log(message)

@staticmethod
def output_from_exception(exception):
Expand All @@ -177,23 +178,18 @@ def execute(self):
"""Execute each steps in the order defined on this pipeline class."""
self.log(f"Pipeline [{self.pipeline_name}] starting")

steps = self.get_steps(groups=self.run.selected_groups)
selected_steps = self.run.selected_steps

if initial_steps := self.get_initial_steps():
steps = initial_steps + steps

steps = self.pipeline_class.get_steps(groups=self.selected_groups)
steps_count = len(steps)
pipeline_start_time = timer()

for current_index, step in enumerate(steps, start=1):
step_name = step.__name__

if selected_steps and step_name not in selected_steps:
if self.selected_steps and step_name not in self.selected_steps:
self.log(f"Step [{step_name}] skipped")
continue

self.run.set_current_step(f"{current_index}/{steps_count} {step_name}")
self.set_current_step(f"{current_index}/{steps_count} {step_name}")
self.log(f"Step [{step_name}] starting")
step_start_time = timer()

Expand All @@ -206,8 +202,114 @@ def execute(self):
step_run_time = timer() - step_start_time
self.log(f"Step [{step_name}] completed in {humanize_time(step_run_time)}")

self.run.set_current_step("") # Reset the `current_step` field on completion
self.set_current_step("") # Reset the `current_step` field on completion
pipeline_run_time = timer() - pipeline_start_time
self.log(f"Pipeline completed in {humanize_time(pipeline_run_time)}")

return 0, ""


class BasePipeline(PipelineDefinition, PipelineRun):
"""
Base class for all pipeline implementations.
It combines the pipeline definition and execution logics.
"""


def group(*groups):
"""Mark a function as part of a particular group."""

def decorator(obj):
if hasattr(obj, "groups"):
obj.groups = obj.groups.union(groups)
else:
setattr(obj, "groups", set(groups))
return obj

return decorator


def humanize_time(seconds):
"""Convert the provided ``seconds`` number into human-readable time."""
message = f"{seconds:.0f} seconds"

if seconds > 86400:
message += f" ({seconds / 86400:.1f} days)"
if seconds > 3600:
message += f" ({seconds / 3600:.1f} hours)"
elif seconds > 60:
message += f" ({seconds / 60:.1f} minutes)"

return message


class LoopProgress:
"""
A context manager for logging progress in loops.

Usage::
total_iterations = 100
logger = print # Replace with your actual logger function

progress = LoopProgress(total_iterations, logger, progress_step=10)
for item in progress.iter(iterator):
"Your processing logic here"

# As a context manager
with LoopProgress(total_iterations, logger, progress_step=10) as progress:
for item in progress.iter(iterator):
"Your processing logic here"
"""

def __init__(self, total_iterations, logger, progress_step=10):
self.total_iterations = total_iterations
self.logger = logger
self.progress_step = progress_step
self.start_time = timer()
self.last_logged_progress = 0
self.current_iteration = 0

def get_eta(self, current_progress):
run_time = timer() - self.start_time
return round(run_time / current_progress * (100 - current_progress))

@property
def current_progress(self):
return int((self.current_iteration / self.total_iterations) * 100)

@property
def eta(self):
run_time = timer() - self.start_time
return round(run_time / self.current_progress * (100 - self.current_progress))

def log_progress(self):
reasons_to_skip = [
not self.logger,
not self.current_iteration > 0,
self.total_iterations <= self.progress_step,
]
if any(reasons_to_skip):
return

if self.current_progress >= self.last_logged_progress + self.progress_step:
msg = (
f"Progress: {self.current_progress}% "
f"({self.current_iteration}/{self.total_iterations})"
)
if eta := self.eta:
msg += f" ETA: {humanize_time(eta)}"

self.logger(msg)
self.last_logged_progress = self.current_progress

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
pass

def iter(self, iterator):
for item in iterator:
self.current_iteration += 1
self.log_progress()
yield item
Loading