diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index ee720d4..835512b 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -41,8 +41,11 @@ def __init__( self.rules = rules_cfg or [] self.pipelines = pipelines_cfg or [] - self._post_init_configure_dask() - self._post_init_create_dask_cluster() + self._cluster = None # Dask Cluster, might be set up later + if self._pymorize_cfg.get("parallel", True): + if pymorize_cfg.get("parallel_backend") == "dask": + self._post_init_configure_dask() + self._post_init_create_dask_cluster() self._post_init_create_pipelines() self._post_init_create_rules() self._post_init_read_bare_tables() @@ -200,7 +203,8 @@ def _post_init_create_pipelines(self): pipelines.append(p) elif isinstance(p, dict): pl = Pipeline.from_dict(p) - pl.assign_cluster(self._cluster) + if self._cluster is not None: + pl.assign_cluster(self._cluster) pipelines.append(Pipeline.from_dict(p)) else: raise ValueError(f"Invalid pipeline configuration for {p}") @@ -291,8 +295,9 @@ def add_rule(self, rule): def add_pipeline(self, pipeline): if not isinstance(pipeline, Pipeline): raise TypeError("pipeline must be an instance of Pipeline") - # Assign the cluster to this pipeline: - pipeline.assign_cluster(self._cluster) + if self._cluster is not None: + # Assign the cluster to this pipeline: + pipeline.assign_cluster(self._cluster) self.pipelines.append(pipeline) def _rule_for_filepath(self, filepath): @@ -349,7 +354,8 @@ def process(self, parallel=None): if parallel is None: parallel = self._pymorize_cfg.get("parallel", True) if parallel: - return self.parallel_process() + parallel_backend = self._pymorize_cfg.get("parallel_backend", "prefect") + return self.parallel_process(backend=parallel_backend) else: return self.serial_process() diff --git a/src/pymorize/pipeline.py b/src/pymorize/pipeline.py index 879be8d..d225a57 100644 --- a/src/pymorize/pipeline.py +++ b/src/pymorize/pipeline.py @@ -95,11 +95,18 @@ def _run_prefect(self, data, rule_spec): ) cmor_name = rule_spec.get("cmor_name") rule_name = rule_spec.get("name", cmor_name) + if self._cluster is None: + logger.warning( + "No cluster assigned to this pipeline. Using local Dask cluster." + ) + dask_scheduler_address = None + else: + dask_scheduler_address = self._cluster.scheduler @flow( flow_run_name=f"{self.name} - {rule_name}", description=f"{rule_spec.get('description', '')}", - task_runner=DaskTaskRunner(address=self._cluster.scheduler_address), + task_runner=DaskTaskRunner(address=dask_scheduler_address), on_completion=[self.on_completion], on_failure=[self.on_failure], )