Skip to content

Commit

Permalink
Merge pull request #63 from esm-tools/feat/cleanup-parallel-init
Browse files Browse the repository at this point in the history
Parallelization Constructor
  • Loading branch information
pgierz authored Nov 12, 2024
2 parents 3b19b09 + 37189a6 commit f891f4d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
18 changes: 12 additions & 6 deletions src/pymorize/cmorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ def __init__(
self.rules = rules_cfg or []
self.pipelines = pipelines_cfg or []

self._post_init_configure_dask()
self._post_init_create_dask_cluster()
self._cluster = None # Dask Cluster, might be set up later
if self._pymorize_cfg.get("parallel", True):
if pymorize_cfg.get("parallel_backend") == "dask":
self._post_init_configure_dask()
self._post_init_create_dask_cluster()
self._post_init_create_pipelines()
self._post_init_create_rules()
self._post_init_read_bare_tables()
Expand Down Expand Up @@ -200,7 +203,8 @@ def _post_init_create_pipelines(self):
pipelines.append(p)
elif isinstance(p, dict):
pl = Pipeline.from_dict(p)
pl.assign_cluster(self._cluster)
if self._cluster is not None:
pl.assign_cluster(self._cluster)
pipelines.append(Pipeline.from_dict(p))
else:
raise ValueError(f"Invalid pipeline configuration for {p}")
Expand Down Expand Up @@ -291,8 +295,9 @@ def add_rule(self, rule):
def add_pipeline(self, pipeline):
if not isinstance(pipeline, Pipeline):
raise TypeError("pipeline must be an instance of Pipeline")
# Assign the cluster to this pipeline:
pipeline.assign_cluster(self._cluster)
if self._cluster is not None:
# Assign the cluster to this pipeline:
pipeline.assign_cluster(self._cluster)
self.pipelines.append(pipeline)

def _rule_for_filepath(self, filepath):
Expand Down Expand Up @@ -349,7 +354,8 @@ def process(self, parallel=None):
if parallel is None:
parallel = self._pymorize_cfg.get("parallel", True)
if parallel:
return self.parallel_process()
parallel_backend = self._pymorize_cfg.get("parallel_backend", "prefect")
return self.parallel_process(backend=parallel_backend)
else:
return self.serial_process()

Expand Down
9 changes: 8 additions & 1 deletion src/pymorize/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,18 @@ def _run_prefect(self, data, rule_spec):
)
cmor_name = rule_spec.get("cmor_name")
rule_name = rule_spec.get("name", cmor_name)
if self._cluster is None:
logger.warning(
"No cluster assigned to this pipeline. Using local Dask cluster."
)
dask_scheduler_address = None
else:
dask_scheduler_address = self._cluster.scheduler

@flow(
flow_run_name=f"{self.name} - {rule_name}",
description=f"{rule_spec.get('description', '')}",
task_runner=DaskTaskRunner(address=self._cluster.scheduler_address),
task_runner=DaskTaskRunner(address=dask_scheduler_address),
on_completion=[self.on_completion],
on_failure=[self.on_failure],
)
Expand Down

0 comments on commit f891f4d

Please sign in to comment.