diff --git a/CHANGELOG.md b/CHANGELOG.md index 717f290af..45fe964be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ Write the date in place of the "Unreleased" in the case a new version is release file was added to the repository root, `example_log_config.yml`. - Added `tiled.adapters.protocols` which will provide possibility for user to implement their custom adapters in a way that satisfies mypy. +- Added `tiled.client.smoke` with a utility for walking a node and ensuring + that the data in it can be read. +- Added `tiled.client.sync` with a utility for copying nodes between two + Tiled instances. ### Changed @@ -19,6 +23,12 @@ Write the date in place of the "Unreleased" in the case a new version is release significant speed-up and avoids frequently re-opening the SQLite file. - Metadata returned from the use of the `select_metadata` is now a one-item dictionary with 'selected' as the key, to match default type/behavior. +- The method `BaseClient.data_sources()` returns dataclass objects instead of + raw dict objects. + +### Fixed + +- Propagate setting `include_data_sources` into child nodes. ## v0.1.0a120 (25 April 2024) diff --git a/docs/source/reference/python-client.md b/docs/source/reference/python-client.md index 93a4cbdf0..c43bc1a81 100644 --- a/docs/source/reference/python-client.md +++ b/docs/source/reference/python-client.md @@ -292,3 +292,13 @@ Tiled currently includes two clients for each structure family: tiled.client.cache.Cache.size tiled.client.cache.Cache.write_safe ``` + +## Sync + + +```{eval-rst} +.. autosummary:: + :toctree: generated + + tiled.client.sync.copy +``` diff --git a/tiled/_tests/test_asset_access.py b/tiled/_tests/test_asset_access.py index 88f85dbac..6a119acfd 100644 --- a/tiled/_tests/test_asset_access.py +++ b/tiled/_tests/test_asset_access.py @@ -56,7 +56,7 @@ def test_raw_export(client, tmpdir): client.write_array([1, 2, 3], key="x") exported_paths = client["x"].raw_export(tmpdir / "exported") data_sources = client["x"].include_data_sources().data_sources() - orig_dir = path_from_uri(data_sources[0]["assets"][0]["data_uri"]) + orig_dir = path_from_uri(data_sources[0].assets[0].data_uri) _asset_id, relative_paths = client["x"].asset_manifest(data_sources).popitem() orig_paths = [Path(orig_dir, relative_path) for relative_path in relative_paths] orig_hashes = [hashlib.md5(path.read_bytes()).digest() for path in orig_paths] diff --git a/tiled/_tests/test_sync.py b/tiled/_tests/test_sync.py new file mode 100644 index 000000000..1722e2e69 --- /dev/null +++ b/tiled/_tests/test_sync.py @@ -0,0 +1,125 @@ +import asyncio +import contextlib +import tempfile + +import awkward +import h5py +import numpy +import pandas +import sparse +import tifffile + +from tiled.catalog import in_memory +from tiled.client import Context, from_context +from tiled.client.register import register +from tiled.client.smoke import read +from tiled.client.sync import copy +from tiled.queries import Key +from tiled.server.app import build_app + + +@contextlib.contextmanager +def client_factory(readable_storage=None): + with tempfile.TemporaryDirectory() as tempdir: + catalog = in_memory(writable_storage=tempdir, readable_storage=readable_storage) + app = build_app(catalog) + with Context.from_app(app) as context: + client = from_context(context) + yield client + + +def populate_external(client, tmp_path): + "Populate a client with registered externally-managed data." + subdir = tmp_path / "subdir" + subdir.mkdir() + # array + image = numpy.ones((3, 5)) + for filepath in [tmp_path / "image.tiff", subdir / "nested_image.tiff"]: + tifffile.imwrite(filepath, image) + # table + for filepath in [tmp_path / "table.csv", subdir / "nested_table.csv"]: + with open(filepath, "w") as file: + file.write("x,y\n1,2\n3,4\n") + # container + for filepath in [tmp_path / "group.h5", subdir / "group.h5"]: + with h5py.File(filepath, "w") as file: + g = file.create_group("g") + g["data"] = numpy.arange(3) + # Note: Tiled does not currently happen to support any formats that it + # identifies as 'awkward' or 'sparse'. Potentially it could, and this + # test could be expanded to include those examples. + asyncio.run(register(client, tmp_path)) + + +def populate_internal(client): + "Populate a client with uploaded internally-managed data." + # array + client.write_array([1, 2, 3], key="a", metadata={"color": "red"}, specs=["alpha"]) + # table + df = pandas.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) + client.write_dataframe(df, key="b", metadata={"color": "green"}, specs=["beta"]) + # awkward + client.write_awkward( + awkward.Array([1, [2, 3]]), key="d", metadata={"color": "red"}, specs=["alpha"] + ) + # sparse + coo = sparse.COO(coords=[[2, 5]], data=[1.3, 7.5], shape=(10,)) + client.write_sparse(key="e", coords=coo.coords, data=coo.data, shape=coo.shape) + + # nested + container = client.create_container("c") + container.write_array( + [1, 2, 3], key="A", metadata={"color": "red"}, specs=["alpha"] + ) + container.write_dataframe(df, key="B", metadata={"color": "green"}, specs=["beta"]) + container.write_awkward( + awkward.Array([1, [2, 3]]), key="D", metadata={"color": "red"}, specs=["alpha"] + ) + container.write_sparse(key="E", coords=coo.coords, data=coo.data, shape=coo.shape) + + +def test_copy_internal(): + with client_factory() as dest: + with client_factory() as source: + populate_internal(source) + copy(source, dest) + assert list(source) == list(dest) + assert list(source["c"]) == list(dest["c"]) + read(dest, strict=True) + + +def test_copy_external(tmp_path): + with client_factory(readable_storage=[tmp_path]) as dest: + with client_factory() as source: + populate_external(source, tmp_path) + copy(source, dest) + assert list(source) == list(dest) + assert list(source["subdir"]) == list(dest["subdir"]) + read(dest, strict=True) + + +def test_copy_search_results(): + with client_factory() as dest: + with client_factory() as source: + populate_internal(source) + results = source.search(Key("color") == "red") + copy(results, dest) + assert list(results) == list(dest) + + +def test_copy_items(): + with client_factory() as dest: + with client_factory() as source: + populate_internal(source) + select_items = source.items()[:2] + copy(select_items, dest) + assert [key for key, _ in select_items] == list(dest) + + +def test_copy_dict(): + with client_factory() as dest: + with client_factory() as source: + populate_internal(source) + select_dict = dict(source.items()[:2]) + copy(select_dict, dest) + assert list(select_dict) == list(dest) diff --git a/tiled/_tests/test_writing.py b/tiled/_tests/test_writing.py index 5defb90e3..6113c1474 100644 --- a/tiled/_tests/test_writing.py +++ b/tiled/_tests/test_writing.py @@ -483,7 +483,7 @@ def test_write_with_specified_mimetype(tree): x.write_partition(df, 0) x.read() x.refresh() - x.data_sources()[0]["mimetype"] == mimetype + assert x.data_sources()[0].mimetype == mimetype # Specifying unsupported mimetype raises expected error. with fail_with_status_code(415): diff --git a/tiled/client/base.py b/tiled/client/base.py index 4e853b69c..49bf8587d 100644 --- a/tiled/client/base.py +++ b/tiled/client/base.py @@ -1,4 +1,3 @@ -import importlib import time import warnings from dataclasses import asdict @@ -6,8 +5,9 @@ from httpx import URL -from ..structures.core import Spec, StructureFamily -from ..utils import UNCHANGED, DictView, ListView, OneShotCachedMap, safe_json_dump +from ..structures.core import STRUCTURE_TYPES, Spec, StructureFamily +from ..structures.data_source import DataSource +from ..utils import UNCHANGED, DictView, ListView, safe_json_dump from .utils import MSGPACK_MIME_TYPE, handle_error @@ -218,7 +218,12 @@ def data_sources(self): `from_uri(...)` or similar.""" ) - return self.include_data_sources().item["attributes"].get("data_sources") + data_sources_json = ( + self.include_data_sources().item["attributes"].get("data_sources") + ) + if data_sources_json is None: + return None + return [DataSource.from_json(d) for d in data_sources_json] def include_data_sources(self): """ @@ -271,16 +276,16 @@ def asset_manifest(self, data_sources): manifest_link = self.item["links"]["self"].replace( "/metadata", "/asset/manifest", 1 ) - for asset in data_source["assets"]: - if asset["is_directory"]: + for asset in data_source.assets: + if asset.is_directory: manifest = handle_error( self.context.http_client.get( - manifest_link, params={"id": asset["id"]} + manifest_link, params={"id": asset.id} ) ).json()["manifest"] else: manifest = None - manifests[asset["id"]] = manifest + manifests[asset.id] = manifest return manifests def raw_export(self, destination_directory=None, max_workers=4): @@ -321,22 +326,22 @@ def raw_export(self, destination_directory=None, max_workers=4): bytes_link = self.item["links"]["self"].replace( "/metadata", "/asset/bytes", 1 ) - for asset in data_source["assets"]: - if len(data_source["assets"]) == 1: + for asset in data_source.assets: + if len(data_source.assets) == 1: # Only one asset: keep the name simple. base_path = destination_directory else: # Multiple assets: Add a subdirectory named for the asset # id to namespace each asset. - base_path = Path(destination_directory, str(asset["id"])) - if asset["is_directory"]: - relative_paths = asset_manifest[asset["id"]] + base_path = Path(destination_directory, str(asset.id)) + if asset.is_directory: + relative_paths = asset_manifest[asset.id] urls.extend( [ URL( bytes_link, params={ - "id": asset["id"], + "id": asset.id, "relative_path": relative_path, }, ) @@ -350,7 +355,7 @@ def raw_export(self, destination_directory=None, max_workers=4): ] ) else: - urls.append(URL(bytes_link, params={"id": asset["id"]})) + urls.append(URL(bytes_link, params={"id": asset.id})) paths.append(Path(base_path, ATTACHMENT_FILENAME_PLACEHOLDER)) return download(self.context.http_client, urls, paths, max_workers=max_workers) @@ -431,21 +436,3 @@ def delete_tree(self): def __dask_tokenize__(self): return (type(self), self.uri) - - -STRUCTURE_TYPES = OneShotCachedMap( - { - StructureFamily.array: lambda: importlib.import_module( - "...structures.array", BaseClient.__module__ - ).ArrayStructure, - StructureFamily.awkward: lambda: importlib.import_module( - "...structures.awkward", BaseClient.__module__ - ).AwkwardStructure, - StructureFamily.table: lambda: importlib.import_module( - "...structures.table", BaseClient.__module__ - ).TableStructure, - StructureFamily.sparse: lambda: importlib.import_module( - "...structures.sparse", BaseClient.__module__ - ).SparseStructure, - } -) diff --git a/tiled/client/container.py b/tiled/client/container.py index e69dae0b5..53719dfb8 100644 --- a/tiled/client/container.py +++ b/tiled/client/container.py @@ -407,6 +407,7 @@ def _items_slice(self, start, stop, direction, _ignore_inlined_contents=False): self.context, self.structure_clients, item, + include_data_sources=self._include_data_sources, ) return if direction > 0: @@ -444,6 +445,7 @@ def _items_slice(self, start, stop, direction, _ignore_inlined_contents=False): self.context, self.structure_clients, item, + include_data_sources=self._include_data_sources, ) next_page_url = content["links"]["next"] @@ -650,6 +652,15 @@ def new( # Merge in "id" and "links" returned by the server. item.update(document) + # Ensure this is a dataclass, not a dict. + # When we apply type hints and mypy to the client it should be possible + # to dispense with this. + if (structure_family != StructureFamily.container) and isinstance( + structure, dict + ): + structure_type = STRUCTURE_TYPES[structure_family] + structure = structure_type.from_json(structure) + return client_for_item( self.context, self.structure_clients, diff --git a/tiled/client/sync.py b/tiled/client/sync.py new file mode 100644 index 000000000..230ea20a9 --- /dev/null +++ b/tiled/client/sync.py @@ -0,0 +1,134 @@ +import itertools + +from ..structures.core import StructureFamily +from ..structures.data_source import DataSource, Management +from .base import BaseClient + + +def copy( + source: BaseClient, + dest: BaseClient, +): + """ + Copy data from one Tiled instance to another. + + Parameters + ---------- + source : tiled node + dest : tiled node + + Examples + -------- + + Connect to two instances and copy data. + + >>> from tiled.client import from_uri + >>> from tiled.client.sync import copy + >>> a = from_uri("http://localhost:8000", api_key="secret") + >>> b = from_uri("http://localhost:9000", api_key="secret") + >>> copy(a, b) + + + Copy select data. + + >>> copy(a.items().head(), b) + >>> copy(a.search(...), b) + + """ + if hasattr(source, "structure_family"): + # looks like a client object + _DISPATCH[source.structure_family](source.include_data_sources(), dest) + else: + _DISPATCH[StructureFamily.container](dict(source), dest) + + +def _copy_array(source, dest): + num_blocks = (range(len(n)) for n in source.chunks) + # Loop over each block index --- e.g. (0, 0), (0, 1), (0, 2) .... + for block in itertools.product(*num_blocks): + array = source.read_block(block) + dest.write_block(array, block) + + +def _copy_awkward(source, dest): + import awkward + + array = source.read() + _form, _length, container = awkward.to_buffers(array) + dest.write(container) + + +def _copy_sparse(source, dest): + num_blocks = (range(len(n)) for n in source.chunks) + # Loop over each block index --- e.g. (0, 0), (0, 1), (0, 2) .... + for block in itertools.product(*num_blocks): + array = source.read_block(block) + dest.write_block(array.coords, array.data, block) + + +def _copy_table(source, dest): + for partition in range(source.structure().npartitions): + df = source.read_partition(partition) + dest.write_partition(df, partition) + + +def _copy_container(source, dest): + for key, child_node in source.items(): + original_data_sources = child_node.include_data_sources().data_sources() + num_data_sources = len(original_data_sources) + if num_data_sources == 0: + # A container with no data sources is just an organizational + # entity in the database. + if child_node.structure_family == StructureFamily.container: + data_sources = [] + else: + raise ValueError( + f"Unable to copy {child_node} which is a " + f"{child_node.structure_family} but has no data sources." + ) + elif num_data_sources == 1: + (original_data_source,) = original_data_sources + if original_data_source.management == Management.external: + data_sources = [original_data_source] + else: + if child_node.structure_family == StructureFamily.container: + data_sources = [] + else: + data_sources = [ + DataSource( + management=original_data_source.management, + mimetype=original_data_source.mimetype, + structure_family=original_data_source.structure_family, + structure=original_data_source.structure, + ) + ] + else: + # As of this writing this is impossible, but we anticipate that + # it may be added someday. + raise NotImplementedError( + "Multiple Data Sources in one Node is not supported." + ) + node = dest.new( + key=key, + structure_family=child_node.structure_family, + data_sources=data_sources, + metadata=dict(child_node.metadata), + specs=child_node.specs, + ) + if ( + original_data_sources + and (original_data_sources[0].management != Management.external) + ) or ( + child_node.structure_family == StructureFamily.container + and (not original_data_sources) + ): + _DISPATCH[child_node.structure_family](child_node, node) + + +_DISPATCH = { + StructureFamily.array: _copy_array, + StructureFamily.awkward: _copy_awkward, + StructureFamily.container: _copy_container, + StructureFamily.sparse: _copy_sparse, + StructureFamily.table: _copy_table, +} diff --git a/tiled/client/utils.py b/tiled/client/utils.py index 8b280a3cc..8e3ae08a1 100644 --- a/tiled/client/utils.py +++ b/tiled/client/utils.py @@ -296,9 +296,9 @@ def get_asset_filepaths(node): """ filepaths = [] for data_source in node.data_sources() or []: - for asset in data_source["assets"]: + for asset in data_source.assets: # If, in the future, there are nodes with s3:// or other # schemes, path_from_uri will raise an exception here # because it cannot provide a filepath. - filepaths.append(path_from_uri(asset["data_uri"])) + filepaths.append(path_from_uri(asset.data_uri)) return filepaths diff --git a/tiled/structures/core.py b/tiled/structures/core.py index a4708b35e..309ada6e8 100644 --- a/tiled/structures/core.py +++ b/tiled/structures/core.py @@ -5,9 +5,12 @@ """ import enum +import importlib from dataclasses import asdict, dataclass from typing import Dict, Optional +from ..utils import OneShotCachedMap + class StructureFamily(str, enum.Enum): array = "array" @@ -41,3 +44,21 @@ def dict(self) -> Dict[str, Optional[str]]: return asdict(self) model_dump = dict # For easy interoperability with pydantic 2.x models + + +STRUCTURE_TYPES = OneShotCachedMap( + { + StructureFamily.array: lambda: importlib.import_module( + "...structures.array", StructureFamily.__module__ + ).ArrayStructure, + StructureFamily.awkward: lambda: importlib.import_module( + "...structures.awkward", StructureFamily.__module__ + ).AwkwardStructure, + StructureFamily.table: lambda: importlib.import_module( + "...structures.table", StructureFamily.__module__ + ).TableStructure, + StructureFamily.sparse: lambda: importlib.import_module( + "...structures.sparse", StructureFamily.__module__ + ).SparseStructure, + } +) diff --git a/tiled/structures/data_source.py b/tiled/structures/data_source.py index 97367d097..f1100044c 100644 --- a/tiled/structures/data_source.py +++ b/tiled/structures/data_source.py @@ -30,3 +30,9 @@ class DataSource: parameters: dict = dataclasses.field(default_factory=dict) assets: List[Asset] = dataclasses.field(default_factory=list) management: Management = Management.writable + + @classmethod + def from_json(cls, d): + d = d.copy() + assets = [Asset(**a) for a in d.pop("assets")] + return cls(assets=assets, **d)