Skip to content

Commit

Permalink
Merge pull request #39 from davramov/issue_37
Browse files Browse the repository at this point in the history
Issue 37: Kill Prefect/Globus prune tasks when encountering "PERMISSION_DENIED" error.
  • Loading branch information
davramov authored Oct 23, 2024
2 parents f0044d1 + 59eb8fd commit 816bd74
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 12 deletions.
5 changes: 5 additions & 0 deletions orchestration/flows/bl832/alcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,11 @@ def process_new_832_ALCF_flow(folder_name: str,
# Step 4: Schedule deletion of files from ALCF, NERSC, and data832
logger.info("Scheduling deletion of files from ALCF, NERSC, and data832")
nersc_transfer_success = False
# alcf_transfer_success = True
# alcf_reconstruction_success = True
# alcf_tiff_to_zarr_success = True
# data832_tiff_transfer_success = True
# data832_zarr_transfer_success = True

schedule_pruning(
alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None,
Expand Down
17 changes: 9 additions & 8 deletions orchestration/flows/bl832/prune.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from prefect import flow, get_run_logger
from prefect.blocks.system import JSON
from typing import Union

from orchestration.flows.bl832.config import Config832
from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe
Expand All @@ -12,7 +13,7 @@
def prune_files(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint = None,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None
):
"""
Expand Down Expand Up @@ -47,7 +48,7 @@ def prune_files(
def prune_spot832(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None,
):
prune_files(
Expand All @@ -62,7 +63,7 @@ def prune_spot832(
def prune_data832(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None,
):
prune_files(
Expand All @@ -76,7 +77,7 @@ def prune_data832(
def prune_data832_raw(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None,
):
prune_files(
Expand All @@ -90,7 +91,7 @@ def prune_data832_raw(
def prune_data832_scratch(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None,
):
prune_files(
Expand All @@ -104,7 +105,7 @@ def prune_data832_scratch(
def prune_alcf832_raw(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None,
):
prune_files(
Expand All @@ -118,7 +119,7 @@ def prune_alcf832_raw(
def prune_alcf832_scratch(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None,
):
prune_files(
Expand All @@ -132,7 +133,7 @@ def prune_alcf832_scratch(
def prune_nersc832_alsdev_scratch(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None] = None,
config=None,
):
prune_files(
Expand Down
11 changes: 8 additions & 3 deletions orchestration/globus/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
from pathlib import Path
from time import time
from typing import Dict, List
from typing import Dict, List, Union
from dotenv import load_dotenv
from globus_sdk import (
ClientCredentialsAuthorizer,
Expand Down Expand Up @@ -243,7 +243,12 @@ def task_wait(

if task["nice_status"] in ["FILE_NOT_FOUND"]:
transfer_client.cancel_task(task_id)
raise TransferError("Received FILE_NOT_FOUND, cancelling task")
raise TransferError(f"Received FILE_NOT_FOUND, cancelling Globus task {task_id}")

if task["nice_status"] in ["PERMISSION_DENIED"]:
transfer_client.cancel_task(task_id)
raise TransferError(f"Received PERMISSION_DENIED, cancelling Globus task {task_id}")

return True


Expand All @@ -252,7 +257,7 @@ def prune_one_safe(
if_older_than_days: int,
tranfer_client: TransferClient,
source_endpoint: GlobusEndpoint,
check_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None],
max_wait_seconds: int = 120,
logger=logger,
):
Expand Down
6 changes: 5 additions & 1 deletion scripts/polaris/tiff_to_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def main():
last_part = os.path.basename(os.path.normpath(tiff_dir))
zarr_dir = os.path.abspath(os.path.join(tiff_dir, '..', last_part + '.zarr'))
if not os.path.exists(zarr_dir):
os.makedirs(zarr_dir)
os.makedirs(zarr_dir, mode=0o2775, exist_ok=True)

print('Output directory: ' + zarr_dir)

Expand All @@ -80,6 +80,10 @@ def main():
# Set permissions for the output directory and its contents
set_permissions_recursive(zarr_dir)

# Extract and set permissions for the parent directory (folder_name)
parent_dir = os.path.abspath(os.path.join(tiff_dir, '../')) # Extract parent directory
set_permissions_recursive(parent_dir) # Set permissions for parent directory


if __name__ == "__main__":
main()

0 comments on commit 816bd74

Please sign in to comment.