Skip to content

Commit

Permalink
Issue #35 and Pruning Patch (#36)
Browse files Browse the repository at this point in the history
Solving issue #35 as well as file pruning bug discovered during weekend
user time.
  • Loading branch information
davramov authored Oct 20, 2024
1 parent 1f86235 commit 72bb7e1
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 142 deletions.
3 changes: 2 additions & 1 deletion orchestration/_tests/test_globus_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ def test_process_new_832_ALCF_flow(mocker: MockFixture):
data832_raw_path=alcf_raw_path,
data832_scratch_path_tiff=f"{scratch_path_tiff}",
data832_scratch_path_zarr=f"{scratch_path_zarr}",
one_minute=True
one_minute=True,
config=mock_config
)
assert isinstance(result, list), "Result should be a list"
assert result == [True, True, True, True, True], "Result does not match expected values"
Expand Down
149 changes: 85 additions & 64 deletions orchestration/flows/bl832/alcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ def transfer_data_to_data832(


@task(name="schedule_prune_task")
def schedule_prune_task(path: str, location: str, schedule_days: datetime.timedelta) -> bool:
def schedule_prune_task(path: str,
location: str,
schedule_days: datetime.timedelta,
source_endpoint=None,
check_endpoint=None) -> bool:
"""
Schedules a Prefect flow to prune files from a specified location.
Expand All @@ -192,9 +196,13 @@ def schedule_prune_task(path: str, location: str, schedule_days: datetime.timede
try:
flow_name = f"delete {location}: {Path(path).name}"
schedule_prefect_flow(
deploymnent_name=f"prune_{location}/prune_{location}",
deployment_name=f"prune_{location}/prune_{location}",
flow_run_name=flow_name,
parameters={"relative_path": path},
parameters={
"relative_path": path,
"source_endpoint": source_endpoint,
"check_endpoint": check_endpoint
},
duration_from_now=schedule_days
)
return True
Expand All @@ -214,7 +222,8 @@ def schedule_pruning(
data832_raw_path: str = None,
data832_scratch_path_tiff: str = None,
data832_scratch_path_zarr: str = None,
one_minute: bool = False) -> bool:
one_minute: bool = False,
config=None) -> bool:
"""
This function schedules the deletion of files from specified locations on ALCF, NERSC, and data832.
Expand All @@ -240,21 +249,21 @@ def schedule_pruning(
nersc_delay = datetime.timedelta(days=pruning_config["delete_nersc832_files_after_days"])
data832_delay = datetime.timedelta(days=pruning_config["delete_data832_files_after_days"])

# (path, location, days)
# (path, location, days, source_endpoint, check_endpoint)
delete_schedules = [
(alcf_raw_path, "alcf832_raw", alcf_delay),
(alcf_scratch_path_tiff, "alcf832_scratch", alcf_delay),
(alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay),
(nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay),
(nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay),
(data832_raw_path, "data832_raw", data832_delay),
(data832_scratch_path_tiff, "data832_scratch", data832_delay),
(data832_scratch_path_zarr, "data832_scratch", data832_delay)
(alcf_raw_path, "alcf832_raw", alcf_delay, config.alcf832_raw, config.data832_raw),
(alcf_scratch_path_tiff, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch),
(alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch),
(nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None),
(nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None),
(data832_raw_path, "data832_raw", data832_delay, config.data832_raw, None),
(data832_scratch_path_tiff, "data832_scratch", data832_delay, config.data832_scratch, None),
(data832_scratch_path_zarr, "data832_scratch", data832_delay, config.data832_scratch, None)
]

for path, location, days in delete_schedules:
for path, location, days, source_endpoint, check_endpoint in delete_schedules:
if path:
schedule_prune_task(path, location, days)
schedule_prune_task(path, location, days, source_endpoint, check_endpoint)
logger.info(f"Scheduled delete from {location} at {days} days")
else:
logger.info(f"Path not provided for {location}, skipping scheduling of deletion task.")
Expand Down Expand Up @@ -570,55 +579,66 @@ def process_new_832_ALCF_flow(folder_name: str,
logger.info(f"Transfer status: {alcf_transfer_success}")
if not alcf_transfer_success:
logger.error("Transfer failed due to configuration or authorization issues.")
raise ValueError("Transfer to ALCF Failed")
else:
logger.info("Transfer successful.")

# Step 2A: Run the Tomopy Reconstruction Globus Flow
logger.info(f"Running Tomopy reconstruction on {file_name} at ALCF")
alcf_reconstruction_success = alcf_tomopy_reconstruction_flow(raw_path=alcf_raw_path,
scratch_path=alcf_scratch_path,
folder_name=folder_name,
file_name=h5_file_name)
if not alcf_reconstruction_success:
logger.error("Reconstruction Failed.")
else:
logger.info("Reconstruction Successful.")

# Step 2B: Run the Tiff to Zarr Globus Flow
logger.info(f"Running Tiff to Zarr on {file_name} at ALCF")
raw_path = f"/eagle/IRIBeta/als/{alcf_raw_path}/{h5_file_name}"
tiff_scratch_path = f"/eagle/IRIBeta/als/bl832/scratch/{folder_name}/rec{file_name}/"
alcf_tiff_to_zarr_success = alcf_tiff_to_zarr_flow(raw_path=raw_path,
tiff_scratch_path=tiff_scratch_path)
if not alcf_tiff_to_zarr_success:
logger.error("Tiff to Zarr Failed.")
else:
logger.info("Tiff to Zarr Successful.")

# Step 3: Send reconstructed data (tiffs and zarr) to data832
# Transfer A: Send reconstructed data (tiff) to data832
logger.info(f"Transferring {file_name} from {alcf_raw_path} at ALCF to {data832_scratch_path} at data832")
logger.info(f"Reconstructed file path: {scratch_path_tiff}")
data832_tiff_transfer_success = transfer_data_to_data832(scratch_path_tiff,
config.tc,
config.alcf832_scratch,
config.data832_scratch)
if not data832_tiff_transfer_success:
logger.error("Transfer failed due to configuration or authorization issues.")
else:
logger.info("Transfer successful.")

# Transfer B: Send reconstructed data (zarr) to data832
logger.info(f"Transferring {file_name} from {alcf_raw_path} at ALCF to {data832_scratch_path} at data832")
logger.info(f"Reconstructed file path: {scratch_path_zarr}")
data832_zarr_transfer_success = transfer_data_to_data832(scratch_path_zarr,
config.tc,
config.alcf832_scratch,
config.data832_scratch)
if not data832_zarr_transfer_success:
logger.error("Transfer failed due to configuration or authorization issues.")
else:
logger.info("Transfer successful.")
logger.info("Transfer to ALCF Successful.")

# Step 2A: Run the Tomopy Reconstruction Globus Flow
logger.info(f"Running Tomopy reconstruction on {file_name} at ALCF")
alcf_reconstruction_success = alcf_tomopy_reconstruction_flow(
raw_path=alcf_raw_path,
scratch_path=alcf_scratch_path,
folder_name=folder_name,
file_name=h5_file_name)
if not alcf_reconstruction_success:
logger.error("Reconstruction Failed.")
raise ValueError("Reconstruction at ALCF Failed")
else:
logger.info("Reconstruction Successful.")

# Step 2B: Run the Tiff to Zarr Globus Flow
logger.info(f"Running Tiff to Zarr on {file_name} at ALCF")
raw_path = f"/eagle/IRIBeta/als/{alcf_raw_path}/{h5_file_name}"
tiff_scratch_path = f"/eagle/IRIBeta/als/bl832/scratch/{folder_name}/rec{file_name}/"
alcf_tiff_to_zarr_success = alcf_tiff_to_zarr_flow(
raw_path=raw_path,
tiff_scratch_path=tiff_scratch_path)
if not alcf_tiff_to_zarr_success:
logger.error("Tiff to Zarr Failed.")
raise ValueError("Tiff to Zarr at ALCF Failed")
else:
logger.info("Tiff to Zarr Successful.")

if alcf_reconstruction_success:
# Step 3: Send reconstructed data (tiffs and zarr) to data832
# Transfer A: Send reconstructed data (tiff) to data832
logger.info(f"Transferring {file_name} from {alcf_raw_path} "
f"at ALCF to {data832_scratch_path} at data832")
logger.info(f"Reconstructed file path: {scratch_path_tiff}")
data832_tiff_transfer_success = transfer_data_to_data832(
scratch_path_tiff,
config.tc,
config.alcf832_scratch,
config.data832_scratch)
if not data832_tiff_transfer_success:
logger.error("Transfer failed due to configuration or authorization issues.")
else:
logger.info("Transfer successful.")

if alcf_tiff_to_zarr_success:
# Transfer B: Send reconstructed data (zarr) to data832
logger.info(f"Transferring {file_name} from {alcf_raw_path} "
f"at ALCF to {data832_scratch_path} at data832")
logger.info(f"Reconstructed file path: {scratch_path_zarr}")
data832_zarr_transfer_success = transfer_data_to_data832(
scratch_path_zarr,
config.tc,
config.alcf832_scratch,
config.data832_scratch)
if not data832_zarr_transfer_success:
logger.error("Transfer failed due to configuration or authorization issues.")
else:
logger.info("Transfer successful.")

# Step 4: Schedule deletion of files from ALCF, NERSC, and data832
logger.info("Scheduling deletion of files from ALCF, NERSC, and data832")
Expand All @@ -633,7 +653,8 @@ def process_new_832_ALCF_flow(folder_name: str,
data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None,
data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None,
data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None,
one_minute=True # Set to False for production durations
one_minute=True, # Set to False for production durations
config=config
)

# Step 5: ingest into scicat ... todo
Expand Down
13 changes: 11 additions & 2 deletions orchestration/flows/bl832/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ def process_new_832_file(file_path: str,
schedule_prefect_flow(
"prune_spot832/prune_spot832",
flow_name,
{"relative_path": relative_path},
{
"relative_path": relative_path,
"source_endpoint": config.spot832,
"check_endpoint": config.data832,
},

datetime.timedelta(days=schedule_spot832_delete_days),
)
logger.info(
Expand All @@ -151,7 +156,11 @@ def process_new_832_file(file_path: str,
schedule_prefect_flow(
"prune_data832/prune_data832",
flow_name,
{"relative_path": relative_path},
{
"relative_path": relative_path,
"source_endpoint": config.data832,
"check_endpoint": config.nersc832,
},
datetime.timedelta(days=schedule_data832_delete_days),
)
logger.info(
Expand Down
111 changes: 82 additions & 29 deletions orchestration/flows/bl832/prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def prune_files(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint = None,
config=None
):
"""
Prune files from a source endpoint.
Expand All @@ -23,7 +24,9 @@ def prune_files(
check_endpoint (GlobusEndpoint, optional): The Globus target endpoint to check. Defaults to None.
"""
p_logger = get_run_logger()
config = Config832()
if config is None:
config = Config832()

globus_settings = JSON.load("globus-settings").value
max_wait_seconds = globus_settings["max_wait_seconds"]
flow_name = f"prune_from_{source_endpoint.name}"
Expand All @@ -41,52 +44,102 @@ def prune_files(


@flow(name="prune_spot832")
def prune_spot832(relative_path: str):
prune_files(relative_path=relative_path,
source_endpoint=Config832().spot832,
check_endpoint=Config832().data832)
def prune_spot832(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
config=None,
):
prune_files(
relative_path=relative_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config
)


@flow(name="prune_data832")
def prune_data832(relative_path: str):
prune_files(relative_path=relative_path,
source_endpoint=Config832().data832,
check_endpoint=Config832().nersc832)
def prune_data832(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
config=None,
):
prune_files(
relative_path=relative_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config)


@flow(name="prune_data832_raw")
def prune_data832_raw(relative_path: str):
prune_files(relative_path=relative_path,
source_endpoint=Config832().data832_raw,
check_endpoint=None)
def prune_data832_raw(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
config=None,
):
prune_files(
relative_path=relative_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config)


@flow(name="prune_data832_scratch")
def prune_data832_scratch(relative_path: str):
prune_files(relative_path=relative_path,
source_endpoint=Config832().data832_scratch,
check_endpoint=None)
def prune_data832_scratch(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
config=None,
):
prune_files(
relative_path=relative_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config)


@flow(name="prune_alcf832_raw")
def prune_alcf832_raw(relative_path: str):
prune_files(relative_path=relative_path,
source_endpoint=Config832().alcf832_raw,
check_endpoint=Config832().data832_raw)
def prune_alcf832_raw(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
config=None,
):
prune_files(
relative_path=relative_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config)


@flow(name="prune_alcf832_scratch")
def prune_alcf832_scratch(relative_path: str):
prune_files(relative_path=relative_path,
source_endpoint=Config832().alcf832_scratch,
check_endpoint=Config832().data832_scratch)
def prune_alcf832_scratch(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
config=None,
):
prune_files(
relative_path=relative_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config)


@flow(name="prune_nersc832_alsdev_scratch")
def prune_nersc832_alsdev_scratch(relative_path: str):
prune_files(relative_path=relative_path,
source_endpoint=Config832().nersc832_alsdev_scratch,
check_endpoint=None)
def prune_nersc832_alsdev_scratch(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
config=None,
):
prune_files(
relative_path=relative_path,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
config=config)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 72bb7e1

Please sign in to comment.