Skip to content

Commit

Permalink
ENH: More verbose error message
Browse files Browse the repository at this point in the history
  • Loading branch information
larsoner committed Sep 9, 2024
1 parent 26b4fe3 commit 5787899
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 8 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ dist/
build/
.vscode/
.DS_Store
/*.egg-info
/.hypothesis/
62 changes: 54 additions & 8 deletions src/openneuro/_download.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,30 @@
"""Openneuro download module.
The flow is roughly:
download
_get_download_metadata
_get_download_metadata
_check_snapshot_exists
_safe_query
_check_snapshot_exists ...
_get_local_tag
_match_include_exclude
_iterate_filenames
_match_include_exclude
_get_download_metadata ...
_download_files
_download_file
_retry_download
_download_file ...
_retrieve_and_write_to_disk
"""

import asyncio
import fnmatch
import hashlib
import json
import shlex
import string
import sys
from collections.abc import Generator, Iterable
Expand Down Expand Up @@ -249,6 +272,7 @@ async def _download_file(
max_retries: int,
retry_backoff: float,
semaphore: asyncio.Semaphore,
query_str: str,
) -> None:
"""Download an individual file."""
if outfile.exists():
Expand Down Expand Up @@ -282,12 +306,11 @@ async def _download_file(
max_retries=max_retries,
retry_backoff=retry_backoff,
semaphore=semaphore,
query_str=query_str,
)
return
else:
raise RuntimeError(
f"Timeout when trying to download " f"{outfile}."
)
raise RuntimeError(f"Timeout when trying to download {outfile}.")

# Try to get the S3 MD5 hash for the file.
try:
Expand Down Expand Up @@ -380,12 +403,19 @@ async def _download_file(
max_retries=max_retries,
retry_backoff=retry_backoff,
semaphore=semaphore,
query_str=query_str,
)
return
else:
raise RuntimeError(
f"Error {response.status_code} when trying "
f"to download {outfile} from {url}"
f"Error {response.status_code} when trying to download "
f"{outfile}. If this is unexpected:\n\n"
"1. Navigate to https://openneuro.org/crn/graphql\n"
f"2. Enter and run the operation: `{query_str}`\n"
'3. In the Response, try to manually download the "urls" '
f'for "{outfile.name}", which should contain {url}\n\n'
"If the download fails, open a GitHub issue like "
"https://github.com/OpenNeuroOrg/openneuro/issues/3145"
)

await _retrieve_and_write_to_disk(
Expand All @@ -410,6 +440,7 @@ async def _download_file(
max_retries=max_retries,
retry_backoff=retry_backoff,
semaphore=semaphore,
query_str=query_str,
)
return
else:
Expand All @@ -428,6 +459,7 @@ async def _retry_download(
max_retries: int,
retry_backoff: float,
semaphore: asyncio.Semaphore,
query_str: str,
) -> None:
tqdm.write(
_unicode(
Expand All @@ -449,6 +481,7 @@ async def _retry_download(
max_retries=max_retries,
retry_backoff=retry_backoff,
semaphore=semaphore,
query_str=query_str,
)


Expand Down Expand Up @@ -542,6 +575,7 @@ async def _download_files(
max_retries: int,
retry_backoff: float,
max_concurrent_downloads: int,
query_str: str,
) -> None:
"""Download files, one by one."""
# Semaphore (counter) to limit maximum number of concurrent download
Expand All @@ -556,6 +590,10 @@ async def _download_files(

outfile = target_dir / filename
outfile.parent.mkdir(parents=True, exist_ok=True)
this_query_str = string.Template(query_str).substitute(
tree=f'"{file["parent_tree"]}"',
)
this_query_str = " ".join(shlex.split(this_query_str, posix=False))
download_task = _download_file(
url=url,
api_file_size=api_file_size,
Expand All @@ -565,6 +603,7 @@ async def _download_files(
max_retries=max_retries,
retry_backoff=retry_backoff,
semaphore=semaphore,
query_str=this_query_str,
)
download_tasks.append(download_task)

Expand Down Expand Up @@ -627,10 +666,12 @@ def _iterate_filenames(
max_retries: int,
root: str = "",
include: Iterable[str] = tuple(),
parent_tree: str | None,
) -> Generator[dict[str, Any], None, None]:
"""Iterate over all files in a dataset, yielding filenames."""
directories = list()
for entity in files:
entity["parent_tree"] = parent_tree
if root:
entity["filename"] = f'{root}/{entity["filename"]}'
if entity["directory"]:
Expand Down Expand Up @@ -683,16 +724,15 @@ def _iterate_filenames(
max_retries=max_retries,
check_snapshot=False,
)
dir_iterator = _iterate_filenames(
yield from _iterate_filenames(
metadata["files"],
dataset_id=dataset_id,
tag=tag,
max_retries=max_retries,
root=this_dir,
include=include,
parent_tree=directory["id"],
)
for path in dir_iterator:
yield path


def _match_include_exclude(
Expand Down Expand Up @@ -830,6 +870,7 @@ def download(
tag=tag,
max_retries=max_retries,
include=include,
parent_tree=None,
),
desc=_unicode(f"Traversing directories for {dataset}", end="", emoji="📁"),
unit=" entities",
Expand Down Expand Up @@ -884,6 +925,10 @@ def download(
)
tqdm.write(_unicode(msg, emoji="📥", end=""))

query_str = snapshot_query_template.safe_substitute(
tag=tag or "null",
dataset_id=dataset,
)
coroutine = _download_files(
target_dir=target_dir,
files=files,
Expand All @@ -892,6 +937,7 @@ def download(
max_retries=max_retries,
retry_backoff=retry_backoff,
max_concurrent_downloads=max_concurrent_downloads,
query_str=query_str,
)

# Try to re-use event loop if it already exists. This is required e.g.
Expand Down

0 comments on commit 5787899

Please sign in to comment.