Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Container cache download #3163

Draft
wants to merge 5 commits into
base: dev
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 57 additions & 32 deletions nf_core/pipelines/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -1086,12 +1086,13 @@ def get_singularity_images(self, current_revision: str = "") -> None:

# Organise containers based on what we need to do with them
containers_exist: List[str] = []
containers_cache: List[Tuple[str, str, Optional[str]]] = []
containers_cache: List[Tuple[str, str, str]] = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, the cache is never optional, so the type List[Tuple[str, str, Optional[str]]] can be simplified to List[Tuple[str, str, str]] for containers_cache.

My rationale was that I wanted an identical type for containers_cache, containers_download and containers_pull to somewhat standardize the functions consuming them. But with the new containers_library variable, the heterogeneity is anyway given, so I have no real objections against.

Ultimately, this comment is just to explain why I initially had done it differently.

containers_library: List[Tuple[str, str, str, Optional[str]]] = []
containers_download: List[Tuple[str, str, Optional[str]]] = []
containers_pull: List[Tuple[str, str, Optional[str]]] = []
for container in self.containers:
# Fetch the output and cached filenames for this container
out_path, cache_path = self.singularity_image_filenames(container)
out_path, cache_path, library_path = self.singularity_image_filenames(container)

# Check that the directories exist
out_path_dir = os.path.dirname(out_path)
Expand All @@ -1109,11 +1110,16 @@ def get_singularity_images(self, current_revision: str = "") -> None:
containers_exist.append(container)
continue

# We have a copy of this in the NXF_SINGULARITY_CACHE dir
# We have a copy of this in NXF_SINGULARITY_CACHEDIR
if cache_path and os.path.exists(cache_path):
containers_cache.append((container, out_path, cache_path))
continue

# We have a copy of this in NXF_SINGULARITY_LIBRARYDIR
if library_path and os.path.exists(library_path):
containers_library.append((container, library_path, out_path, cache_path))
continue

# Direct download within Python
if container.startswith("http"):
containers_download.append((container, out_path, cache_path))
Expand Down Expand Up @@ -1145,6 +1151,12 @@ def get_singularity_images(self, current_revision: str = "") -> None:
self.singularity_copy_cache_image(*container)
progress.update(task, advance=1)

if containers_library:
for container in containers_library:
progress.update(task, description="Copying singularity images from library")
self.singularity_copy_library_image(*container)
progress.update(task, advance=1)

if containers_download or containers_pull:
# if clause gives slightly better UX, because Download is no longer displayed if nothing is left to be downloaded.
with concurrent.futures.ThreadPoolExecutor(max_workers=self.parallel_downloads) as pool:
Expand Down Expand Up @@ -1226,19 +1238,20 @@ def get_singularity_images(self, current_revision: str = "") -> None:
# Task should advance in any case. Failure to pull will not kill the download process.
progress.update(task, advance=1)

def singularity_image_filenames(self, container: str) -> Tuple[str, Optional[str]]:
def singularity_image_filenames(self, container: str) -> Tuple[str, Optional[str], Optional[str]]:
"""Check Singularity cache for image, copy to destination folder if found.

Args:
container (str): A pipeline's container name. Can be direct download URL
or a Docker Hub repository ID.

Returns:
tuple (str, str): Returns a tuple of (out_path, cache_path).
(str, str, str): Returns a tuple of (out_path, cache_path, library_path).
out_path is the final target output path. it may point to the NXF_SINGULARITY_CACHEDIR, if cache utilisation was set to 'amend'.
If cache utilisation was set to 'copy', it will point to the target folder, a subdirectory of the output directory. In the latter case,
cache_path may either be None (image is not yet cached locally) or point to the image in the NXF_SINGULARITY_CACHEDIR, so it will not be
downloaded from the web again, but directly copied from there. See get_singularity_images() for implementation.
library_path is the points to the container in NXF_SINGULARITY_LIBRARYDIR, if the latter is defined.
"""

# Generate file paths
Expand Down Expand Up @@ -1281,16 +1294,33 @@ def singularity_image_filenames(self, container: str) -> Tuple[str, Optional[str
elif self.container_cache_utilisation in ["amend", "copy"]:
raise FileNotFoundError("Singularity cache is required but no '$NXF_SINGULARITY_CACHEDIR' set!")

return (out_path, cache_path)
library_path = None
if os.environ.get("NXF_SINGULARITY_LIBRARYDIR"):
library_path = os.path.join(os.environ["NXF_SINGULARITY_LIBRARYDIR"], out_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will not work. The registries are stripped from the out_name and without symlinks, I doubt that there will be a single appropriately named container in the NXF_SINGULARITY_LIBRARYDIR.

You will have to use the name prior to trimming the registries.


def singularity_copy_cache_image(self, container: str, out_path: str, cache_path: Optional[str]) -> None:
return (out_path, cache_path, library_path)

def singularity_copy_cache_image(self, container: str, out_path: str, cache_path: str) -> None:
"""Copy Singularity image from NXF_SINGULARITY_CACHEDIR to target folder."""
# Copy to destination folder if we have a cached version
if cache_path and os.path.exists(cache_path):
log.debug(f"Copying {container} from cache: '{os.path.basename(out_path)}'")
shutil.copyfile(cache_path, out_path)
# Create symlinks to ensure that the images are found even with different registries being used.
self.symlink_singularity_images(out_path)
self.singularity_copy_image(container, cache_path, out_path)
# Create symlinks to ensure that the images are found even with different registries being used.
self.symlink_singularity_images(cache_path)

def singularity_copy_library_image(
self, container: str, library_path: str, out_path: str, cache_path: Optional[str]
) -> None:
"""Copy Singularity image from NXF_SINGULARITY_LIBRARYDIR to target folder, and possibly NXF_SINGULARITY_CACHEDIR."""
self.singularity_copy_image(container, library_path, out_path)
if cache_path:
self.singularity_copy_image(container, library_path, cache_path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not used the $NXF_SINGULARITY_LIBRARYDIR so far, so I struggle to conceptualize corresponding setups. Intuitively, however, I question that copying the image to the cache is desirable? That will probably lead to a lot of data duplication?

Also mind that in the case of self.container_cache_utilisation == "amend", the cache_path is assigned to the out_path in singularity_image_filenames() function. So if you retain this copy step, you should at least not copy the same image twice to cache, but making the self.singularity_copy_image(container, library_path, cache_path) conditional depending on the chosen cache utilisation.


def singularity_copy_image(self, container: str, from_path: str, to_path: str) -> None:
"""Copy Singularity image between folders. This function is used seamlessly
across the target directory, NXF_SINGULARITY_CACHEDIR, and NXF_SINGULARITY_LIBRARYDIR."""
log.debug(f"Copying {container} to cache: '{os.path.basename(from_path)}'")
shutil.copyfile(from_path, to_path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but this function misses important functionality. If you factor out the copying process, you need to consider that copies may be interrupted by exceptions or by the user using SIGINT (CTRL+C).

Previously, in case of incomplete/corrupted downloads, the local files were deleted by the except and finally branches within the singularity_download_image() function. (Lines 1356-1369 on dev). Additionally, it did not matter too much, since the copy happened from the cache to the output_path, typically a folder the user would delete in case of download failures.

But now that you changed the logic, it seems more likely to me that a user could amass corrupted images in the cache folder persistently. Therefore, it is important to ensure that the copy process removes partially copied files etc. in case of exceptions or SIGINT, e.g. with a cleanup_temp_files() function.

Something along this line:

import signal
import sys

# Example
def cleanup_temp_files():
    if os.path.exists('temp_file'):
        os.remove('temp_file')

# Define a signal handler for SIGINT (CTRL+C)
def abort_download(sig, frame):
    cleanup_temp_files()
    raise DownloadError("Aborting pipeline download due to user interruption.")

signal.signal(signal.SIGINT, abort_download)

# Example of using try-finally for cleanup in case of exceptions
try:
    # File copy code here
    # ...
except Exception as e:
    cleanup_temp_files()
      raise DownloadError(e) from e
finally:
    cleanup_temp_files()

# Create symlinks to ensure that the images are found even with different registries being used.
self.symlink_singularity_images(to_path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind that the cleanup function should also cover the symlinks in the cache.


def singularity_download_image(
self, container: str, out_path: str, cache_path: Optional[str], progress: DownloadProgress
Expand All @@ -1309,8 +1339,7 @@ def singularity_download_image(
log.debug(f"Downloading Singularity image: '{container}'")

# Set output path to save file to
output_path = cache_path or out_path
output_path_tmp = f"{output_path}.partial"
output_path_tmp = f"{out_path}.partial"
log.debug(f"Downloading to: '{output_path_tmp}'")

# Set up progress bar
Expand Down Expand Up @@ -1340,16 +1369,15 @@ def singularity_download_image(
fh.write(data)

# Rename partial filename to final filename
os.rename(output_path_tmp, output_path)
os.rename(output_path_tmp, out_path)

# Copy cached download if we are using the cache
# Copy download to cache if one is defined
if cache_path:
log.debug(f"Copying {container} from cache: '{os.path.basename(out_path)}'")
progress.update(task, description="Copying from cache to target directory")
shutil.copyfile(cache_path, out_path)
progress.update(task, description="Copying from target directory to cache")
self.singularity_copy_image(container, out_path, cache_path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to check now, if the cache_path actually exists on the file system:
os.path.exists(cache_path).

Previously, the get_singularity_images() function ensured, that an image was actually present in cache before adding it to the list of images that are already cached.

  if cache_path and os.path.exists(cache_path):
      containers_cache.append((container, out_path, cache_path))

Hence, it was safe to copy it without further checks. Now, the cache_path is just created from the environment variable without further checks if the defined directory actually exists in the file system and is writable. Both needs to be done either here just for the cache path or preferably inside the generic copy function - better safe than sorry.


# Create symlinks to ensure that the images are found even with different registries being used.
self.symlink_singularity_images(output_path)
self.symlink_singularity_images(out_path)

progress.remove_task(task)

Expand All @@ -1361,8 +1389,8 @@ def singularity_download_image(
log.debug(f"Deleting incompleted singularity image download:\n'{output_path_tmp}'")
if output_path_tmp and os.path.exists(output_path_tmp):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the cleanup function I mentioned. Now, it is not sufficient anymore, because the cache is not considered.

os.remove(output_path_tmp)
if output_path and os.path.exists(output_path):
os.remove(output_path)
if out_path and os.path.exists(out_path):
os.remove(out_path)
# Re-raise the caught exception
raise
finally:
Expand All @@ -1383,8 +1411,6 @@ def singularity_pull_image(
Raises:
Various exceptions possible from `subprocess` execution of Singularity.
"""
output_path = cache_path or out_path

# where the output of 'singularity pull' is first generated before being copied to the NXF_SINGULARITY_CACHDIR.
# if not defined by the Singularity administrators, then use the temporary directory to avoid storing the images in the work directory.
if os.environ.get("SINGULARITY_CACHEDIR") is None:
Expand All @@ -1406,11 +1432,11 @@ def singularity_pull_image(
"singularity",
"pull",
"--name",
output_path,
out_path,
address,
]
elif shutil.which("apptainer"):
singularity_command = ["apptainer", "pull", "--name", output_path, address]
singularity_command = ["apptainer", "pull", "--name", out_path, address]
else:
raise OSError("Singularity/Apptainer is needed to pull images, but it is not installed or not in $PATH")
log.debug(f"Building singularity image: {address}")
Expand Down Expand Up @@ -1453,14 +1479,13 @@ def singularity_pull_image(
error_msg=lines,
)

# Copy cached download if we are using the cache
# Copy download to cache if one is defined
if cache_path:
log.debug(f"Copying {container} from cache: '{os.path.basename(out_path)}'")
progress.update(task, current_log="Copying from cache to target directory")
shutil.copyfile(cache_path, out_path)
progress.update(task, current_log="Copying from target directory to cache")
self.singularity_copy_image(container, out_path, cache_path)

# Create symlinks to ensure that the images are found even with different registries being used.
self.symlink_singularity_images(output_path)
self.symlink_singularity_images(out_path)

progress.remove_task(task)

Expand Down
Loading