Skip to content

Commit

Permalink
--wip--
Browse files Browse the repository at this point in the history
  • Loading branch information
pgierz committed Nov 11, 2024
1 parent 8592902 commit 90b859d
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 4 deletions.
1 change: 1 addition & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"tests.fixtures.config_files",
"tests.fixtures.CV_Dir",
"tests.fixtures.CMIP_Tables_Dir",
"tests.fixtures.example_data.pi_uxarray",
]


Expand Down
14 changes: 10 additions & 4 deletions src/pymorize/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import randomname
from prefect import flow
from prefect.cache_policies import INPUTS, TASK_SOURCE
from prefect.tasks import Task, task_input_hash
from prefect.tasks import Task
from prefect_dask import DaskTaskRunner

from .caching import generate_cache_key
Expand All @@ -21,13 +21,19 @@ def __init__(
*args,
name=None,
workflow_backend="prefect",
cache_policy=None,
dask_cluster=None,
cache_expiration=None,
):
self._steps = args
self.name = name or randomname.get_name()
self._workflow_backend = workflow_backend
self._cluster = dask_cluster
self._prefect_cache_kwargs = {}
if cache_policy is None:
self._cache_policy = TASK_SOURCE + INPUTS
self._prefect_cache_kwargs["cache_policy"] = self._cache_policy


if cache_expiration is None:
self._cache_expiration = timedelta(days=1)
Expand All @@ -36,6 +42,7 @@ def __init__(
self._cache_expiration = cache_expiration
else:
raise TypeError("Cache expiration must be a timedelta!")
self._prefect_cache_kwargs["cache_expiration"]=self._cache_expiration,

if self._workflow_backend == "prefect":
self._prefectize_steps()
Expand All @@ -51,7 +58,7 @@ def __str__(self):
return "\n".join(r_val)

def assign_cluster(self, cluster):
logger.debug("Assinging cluster to this pipeline")
logger.debug("Assigning cluster to this pipeline")
self._cluster = cluster

def _prefectize_steps(self):
Expand All @@ -64,9 +71,8 @@ def _prefectize_steps(self):
prefect_tasks.append(
Task(
fn=step,
**self._prefect_cache_kwargs,
# cache_key_fn=generate_cache_key,
cache_expiration=self._cache_expiration,
cache_policy=TASK_SOURCE + INPUTS,
)
)

Expand Down
21 changes: 21 additions & 0 deletions tests/configs/test_config_pi_uxarray.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
pymorize:
version: "unreleased"
use_xarray_backend: true
warn_on_no_rule: False
minimum_jobs: 8
maximum_jobs: 10
general:
name: "pi_uxarray"
description: "This is a test configuration using the UXArray test data on PI Mesh"
maintainer: "pgierz"
email: "[email protected]"
cmor_version: "CMIP6"
mip: "CMIP"
frequency: "mon"
CMIP_Tables_Dir: "./cmip6-cmor-tables/Tables"
rules:
- name: "temp"
inputs:
- path: "REPLACE_ME"
pattern: "temp.fesom.????.nc"
cmor_variable: "thetao"
5 changes: 5 additions & 0 deletions tests/fixtures/config_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@
@pytest.fixture
def test_config():
return TEST_ROOT / "configs" / "test_config.yaml"


@pytest.fixture
def test_config_pi_uxarray():
return TEST_ROOT / "configs" / "test_config_pi_uxarray.yaml"
Empty file.
38 changes: 38 additions & 0 deletions tests/fixtures/example_data/pi_uxarray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Example data for the FESOM model."""

import tarfile
from pathlib import Path

import pytest
import requests

URL = "https://nextcloud.awi.de/s/swqyFgbL2jjgjRo/download/pi_uxarray.tar"
"""str : URL to download the example data from."""


@pytest.fixture(scope="session")
def download_data(tmp_path_factory):
cache_dir = tmp_path_factory.getbasetemp() / "cached_data"
cache_dir.mkdir(exist_ok=True)
data_path = cache_dir / "pi_uxarray.tar"

if not data_path.exists():
response = requests.get(URL)
response.raise_for_status()
with open(data_path, "wb") as f:
f.write(response.content)
print("Data downloaded.")
else:
print("Using cached data.")

return data_path


@pytest.fixture(scope="session")
def pi_uxarray_data(download_data):

data_dir = Path(download_data).parent
with tarfile.open(download_data, "r") as tar:
tar.extractall(data_dir)

return data_dir / "pi_uxarray"
13 changes: 13 additions & 0 deletions tests/integration/test_uxarray_pi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
def test_process(test_uxarray_config, pi_uxarray_data):
from pymorize.cmorizer import CMORizer
from pymorize.logging import logger

logger.info(f"Processing {test_uxarray_config}")
with open(test_uxarray_config, "r") as f:
cfg = yaml.safe_load(f)
for rule in cfg.rules:
for input in rule["inputs"]:
input["path"] = input["path"].replace("REPLACE_ME", pi_uxarray_data)

cmorizer = CMORizer.from_dict(test_uxarray_config)
cmorizer.process()

0 comments on commit 90b859d

Please sign in to comment.