Skip to content

Commit

Permalink
Test with hash comparison.
Browse files Browse the repository at this point in the history
  • Loading branch information
danielballan committed Feb 7, 2024
1 parent 290c4ba commit d96a276
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 19 deletions.
22 changes: 21 additions & 1 deletion tiled/_tests/test_asset_access.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import hashlib
from pathlib import Path

import pytest

from ..catalog import in_memory
from ..client import Context, from_context
from ..server.app import build_app
from ..utils import path_from_uri


@pytest.fixture
Expand All @@ -22,7 +26,8 @@ def client(context):
def test_include_data_sources_method_on_self(client):
"Calling include_data_sources() fetches data sources on self."
client.write_array([1, 2, 3], key="x")
with pytest.raises(RuntimeError):
with pytest.warns(UserWarning):
# This fetches the sources with a second request.
client["x"].data_sources
client["x"].include_data_sources().data_sources is not None

Expand All @@ -41,3 +46,18 @@ def test_include_data_sources_kwarg(context):
client.write_array([1, 2, 3], key="x")
client["x"].data_sources is not None
client["x"].include_data_sources() is not None


def test_raw_export(client, tmpdir):
"Use raw_export() and compare hashes or original and exported files."
client.write_array([1, 2, 3], key="x")
exported_paths = client["x"].raw_export(tmpdir)
data_sources = client["x"].include_data_sources().data_sources
orig_dir = path_from_uri(data_sources[0]["assets"][0]["data_uri"])
_asset_id, relative_paths = client["x"].asset_manifests(data_sources).popitem()
orig_paths = [Path(orig_dir, relative_path) for relative_path in relative_paths]
orig_hashes = [hashlib.md5(path.read_bytes()).digest() for path in orig_paths]
exported_hashes = [
hashlib.md5(path.read_bytes()).digest() for path in exported_paths
]
assert orig_hashes == exported_hashes
65 changes: 49 additions & 16 deletions tiled/client/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import importlib
import time
import warnings
from dataclasses import asdict
from pathlib import Path

Expand Down Expand Up @@ -203,14 +204,22 @@ def structure_family(self):
@property
def data_sources(self):
if not self._include_data_sources:
raise RuntimeError(
"Data Sources were not fetched. Use include_data_sources()"
warnings.warn(
"""Calling include_data_sources().refresh().
To fetch the data sources up front, call include_data_sources() on the
client or pass the optional parameter `include_data_sources=True` to
`from_uri(...)` or similar."""
)
return self.item["attributes"].get("data_sources")
return self.include_data_sources().item["attributes"].get("data_sources")

def include_data_sources(self):
"""
Ensure that data source and asset information is fetch.
If it has already been fetched, this is a no-op.
"""
if self._include_data_sources:
return self
return self # no op
return self.new_variation(include_data_sources=True).refresh()

def new_variation(
Expand All @@ -234,6 +243,31 @@ def new_variation(
**kwargs,
)

def asset_manifests(self, data_sources):
"""
Return a manifest of the relative paths of the contents in each asset.
This return a dictionary keyed on asset ID.
Assets backed by a single file are mapped to None (no manifest).
Asset backed by a directory of files are mapped to a list of relative paths.
"""
manifests = {}
for data_source in data_sources:
manifest_link = self.item["links"]["self"].replace(
"/metadata", "/asset/manifest", 1
)
for asset in data_source["assets"]:
if asset["is_directory"]:
manifest = handle_error(
self.context.http_client.get(
manifest_link, params={"id": asset["id"]}
)
).json()["manifest"]
else:
manifest = None
manifests[asset["id"]] = manifest
return manifests

def raw_export(self, directory=None, max_workers=4):
"""
Download the raw assets backing this node.
Expand All @@ -242,10 +276,15 @@ def raw_export(self, directory=None, max_workers=4):
Parameters
----------
directory : Path
directory : Path, optional
Default is current working directory
max_workers : int
Number of parallel workers downloading data
max_workers : int, optional
Number of parallel workers downloading data. Default is 4.
Returns
-------
paths : List[Path]
Filepaths of exported files
"""
if directory is None:
directory = Path.cwd()
Expand All @@ -258,6 +297,7 @@ def raw_export(self, directory=None, max_workers=4):
urls = []
paths = []
data_sources = self.include_data_sources().data_sources
asset_manifests = self.asset_manifests(data_sources)
if len(data_sources) != 1:
raise NotImplementedError(
"Export of multiple data sources not yet supported"
Expand All @@ -266,9 +306,6 @@ def raw_export(self, directory=None, max_workers=4):
bytes_link = self.item["links"]["self"].replace(
"/metadata", "/asset/bytes", 1
)
manifest_link = self.item["links"]["self"].replace(
"/metadata", "/asset/manifest", 1
)
for asset in data_source["assets"]:
if len(data_source["assets"]) == 1:
# Only one asset: keep the name simple.
Expand All @@ -278,11 +315,7 @@ def raw_export(self, directory=None, max_workers=4):
# id to namespace each asset.
base_path = Path(directory, str(asset["id"]))
if asset["is_directory"]:
relative_paths = handle_error(
self.context.http_client.get(
manifest_link, params={"id": asset["id"]}
)
).json()["manifest"]
relative_paths = asset_manifests[asset["id"]]
urls.extend(
[
URL(
Expand All @@ -304,7 +337,7 @@ def raw_export(self, directory=None, max_workers=4):
else:
urls.append(URL(bytes_link, params={"id": asset["id"]}))
paths.append(Path(base_path, ATTACHMENT_FILENAME_PLACEHOLDER))
download(self.context.http_client, urls, paths, max_workers=max_workers)
return download(self.context.http_client, urls, paths, max_workers=max_workers)

@property
def formats(self):
Expand Down
9 changes: 7 additions & 2 deletions tiled/client/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re
import signal
from collections.abc import Iterable
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, wait
from pathlib import Path
from threading import Event

Expand Down Expand Up @@ -57,6 +57,7 @@ def _download_url(
progress.console.log(f"ERROR {err!r}")
else:
progress.console.log(f"Downloaded {path}")
return path


def download(
Expand Down Expand Up @@ -90,14 +91,18 @@ def sigint_handler(signum, frame):
done_event = Event()
original_sigint_handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, sigint_handler)
futures = []
try:
with progress:
with ThreadPoolExecutor(max_workers=max_workers) as pool:
for url, path in zip(urls, paths):
task_id = progress.add_task("download", start=False)
pool.submit(
future = pool.submit(
_download_url, progress, task_id, done_event, client, url, path
)
futures.append(future)
wait(futures)
finally:
# Restore SIGINT handler.
signal.signal(signal.SIGINT, original_sigint_handler)
return [future.result() for future in futures]

0 comments on commit d96a276

Please sign in to comment.