From eff13a025a4b69893f47064d896c825a7afe2df6 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 6 May 2024 11:34:36 -0400 Subject: [PATCH 01/18] In client, expose DataSource, Asset as dataclass. --- tiled/_tests/test_asset_access.py | 2 +- tiled/_tests/test_writing.py | 2 +- tiled/client/base.py | 30 ++++++++++++++++++------------ tiled/client/utils.py | 4 ++-- tiled/structures/data_source.py | 6 ++++++ 5 files changed, 28 insertions(+), 16 deletions(-) diff --git a/tiled/_tests/test_asset_access.py b/tiled/_tests/test_asset_access.py index 88f85dbac..6a119acfd 100644 --- a/tiled/_tests/test_asset_access.py +++ b/tiled/_tests/test_asset_access.py @@ -56,7 +56,7 @@ def test_raw_export(client, tmpdir): client.write_array([1, 2, 3], key="x") exported_paths = client["x"].raw_export(tmpdir / "exported") data_sources = client["x"].include_data_sources().data_sources() - orig_dir = path_from_uri(data_sources[0]["assets"][0]["data_uri"]) + orig_dir = path_from_uri(data_sources[0].assets[0].data_uri) _asset_id, relative_paths = client["x"].asset_manifest(data_sources).popitem() orig_paths = [Path(orig_dir, relative_path) for relative_path in relative_paths] orig_hashes = [hashlib.md5(path.read_bytes()).digest() for path in orig_paths] diff --git a/tiled/_tests/test_writing.py b/tiled/_tests/test_writing.py index 5defb90e3..9c554f2f5 100644 --- a/tiled/_tests/test_writing.py +++ b/tiled/_tests/test_writing.py @@ -483,7 +483,7 @@ def test_write_with_specified_mimetype(tree): x.write_partition(df, 0) x.read() x.refresh() - x.data_sources()[0]["mimetype"] == mimetype + x.data_sources()[0].mimetype == mimetype # Specifying unsupported mimetype raises expected error. with fail_with_status_code(415): diff --git a/tiled/client/base.py b/tiled/client/base.py index 4e853b69c..1dc2e0726 100644 --- a/tiled/client/base.py +++ b/tiled/client/base.py @@ -7,6 +7,7 @@ from httpx import URL from ..structures.core import Spec, StructureFamily +from ..structures.data_source import DataSource from ..utils import UNCHANGED, DictView, ListView, OneShotCachedMap, safe_json_dump from .utils import MSGPACK_MIME_TYPE, handle_error @@ -218,7 +219,12 @@ def data_sources(self): `from_uri(...)` or similar.""" ) - return self.include_data_sources().item["attributes"].get("data_sources") + data_sources_json = ( + self.include_data_sources().item["attributes"].get("data_sources") + ) + if data_sources_json is None: + return None + return [DataSource.from_json(d) for d in data_sources_json] def include_data_sources(self): """ @@ -271,16 +277,16 @@ def asset_manifest(self, data_sources): manifest_link = self.item["links"]["self"].replace( "/metadata", "/asset/manifest", 1 ) - for asset in data_source["assets"]: - if asset["is_directory"]: + for asset in data_source.assets: + if asset.is_directory: manifest = handle_error( self.context.http_client.get( - manifest_link, params={"id": asset["id"]} + manifest_link, params={"id": asset.id} ) ).json()["manifest"] else: manifest = None - manifests[asset["id"]] = manifest + manifests[asset.id] = manifest return manifests def raw_export(self, destination_directory=None, max_workers=4): @@ -321,22 +327,22 @@ def raw_export(self, destination_directory=None, max_workers=4): bytes_link = self.item["links"]["self"].replace( "/metadata", "/asset/bytes", 1 ) - for asset in data_source["assets"]: - if len(data_source["assets"]) == 1: + for asset in data_source.assets: + if len(data_source.assets) == 1: # Only one asset: keep the name simple. base_path = destination_directory else: # Multiple assets: Add a subdirectory named for the asset # id to namespace each asset. - base_path = Path(destination_directory, str(asset["id"])) - if asset["is_directory"]: - relative_paths = asset_manifest[asset["id"]] + base_path = Path(destination_directory, str(asset.id)) + if asset.is_directory: + relative_paths = asset_manifest[asset.id] urls.extend( [ URL( bytes_link, params={ - "id": asset["id"], + "id": asset.id, "relative_path": relative_path, }, ) @@ -350,7 +356,7 @@ def raw_export(self, destination_directory=None, max_workers=4): ] ) else: - urls.append(URL(bytes_link, params={"id": asset["id"]})) + urls.append(URL(bytes_link, params={"id": asset.id})) paths.append(Path(base_path, ATTACHMENT_FILENAME_PLACEHOLDER)) return download(self.context.http_client, urls, paths, max_workers=max_workers) diff --git a/tiled/client/utils.py b/tiled/client/utils.py index 8b280a3cc..8e3ae08a1 100644 --- a/tiled/client/utils.py +++ b/tiled/client/utils.py @@ -296,9 +296,9 @@ def get_asset_filepaths(node): """ filepaths = [] for data_source in node.data_sources() or []: - for asset in data_source["assets"]: + for asset in data_source.assets: # If, in the future, there are nodes with s3:// or other # schemes, path_from_uri will raise an exception here # because it cannot provide a filepath. - filepaths.append(path_from_uri(asset["data_uri"])) + filepaths.append(path_from_uri(asset.data_uri)) return filepaths diff --git a/tiled/structures/data_source.py b/tiled/structures/data_source.py index 97367d097..f1100044c 100644 --- a/tiled/structures/data_source.py +++ b/tiled/structures/data_source.py @@ -30,3 +30,9 @@ class DataSource: parameters: dict = dataclasses.field(default_factory=dict) assets: List[Asset] = dataclasses.field(default_factory=list) management: Management = Management.writable + + @classmethod + def from_json(cls, d): + d = d.copy() + assets = [Asset(**a) for a in d.pop("assets")] + return cls(assets=assets, **d) From e6dc9ed205edba995286f8cebf29090e18f3a8bb Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 6 May 2024 11:35:01 -0400 Subject: [PATCH 02/18] Add utility for syncing two Tiled instances. --- tiled/client/sync.py | 87 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 tiled/client/sync.py diff --git a/tiled/client/sync.py b/tiled/client/sync.py new file mode 100644 index 000000000..da14f2d98 --- /dev/null +++ b/tiled/client/sync.py @@ -0,0 +1,87 @@ +import itertools + +from ..structures.core import StructureFamily +from ..structures.data_source import DataSource, Management +from .base import BaseClient + + +def sync( + source: BaseClient, + dest: BaseClient, + copy_internal: bool = True, + copy_external: bool = False, +): + """ + + Parameters + ---------- + source : tiled node + dest : tiled node + """ + source = source.include_data_sources().refresh() + _DISPATCH[source.structure_family](source, dest, copy_internal, copy_external) + + +def _sync_array(source, dest, copy_internal, copy_external): + num_blocks = (range(len(n)) for n in source.chunks) + # Loop over each block index --- e.g. (0, 0), (0, 1), (0, 2) .... + for block in itertools.product(*num_blocks): + array = source.read_block(block) + dest.write_block(array, block) + + +def _sync_table(source, dest, copy_internal, copy_external): + for partition in range(source.structure().npartitions): + df = source.read_partition(partition) + dest.write_partition(df, partition) + + +def _sync_container(source, dest, copy_internal, copy_external): + for key, child_node in source.items(): + original_data_sources = child_node.data_sources() + if not original_data_sources: + if child_node.structure_family == StructureFamily.container: + data_sources = [] + else: + raise ValueError( + f"Unable to copy {child_node} which has is a " + f"{child_node.structure_family} but has no data sources." + ) + else: + (original_data_source,) = original_data_sources + if original_data_source.management == Management.external: + data_sources = [original_data_source] + if copy_external: + raise NotImplementedError( + "Copying externally-managed data is not yet implemented" + ) + else: + if child_node.structure_family == StructureFamily.container: + data_sources = [] + else: + data_sources = [ + DataSource( + management=original_data_source.management, + mimetype=original_data_source.mimetype, + structure_family=original_data_source.structure_family, + structure=original_data_source.structure, + ) + ] + node = dest.new( + key=key, + structure_family=child_node.structure_family, + data_sources=data_sources, + metadata=dict(child_node.metadata), + specs=child_node.specs, + ) + if (original_data_source.management != Management.external) and copy_internal: + _DISPATCH[child_node.structure_family]( + child_node, node, copy_internal, copy_external + ) + + +_DISPATCH = { + StructureFamily.array: _sync_array, + StructureFamily.container: _sync_container, + StructureFamily.table: _sync_table, +} From ade72be62fe079b4c525c086c7da68070671feaa Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 6 May 2024 12:03:45 -0400 Subject: [PATCH 03/18] Do not assume there is a data source. --- tiled/client/sync.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tiled/client/sync.py b/tiled/client/sync.py index da14f2d98..c18c7af13 100644 --- a/tiled/client/sync.py +++ b/tiled/client/sync.py @@ -74,7 +74,11 @@ def _sync_container(source, dest, copy_internal, copy_external): metadata=dict(child_node.metadata), specs=child_node.specs, ) - if (original_data_source.management != Management.external) and copy_internal: + if ( + original_data_sources + and (original_data_sources[0].management != Management.external) + and copy_internal + ): _DISPATCH[child_node.structure_family]( child_node, node, copy_internal, copy_external ) From 5b67894f001cfc7fc65bf24f5932459718b4b8c4 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 6 May 2024 12:11:34 -0400 Subject: [PATCH 04/18] Recurse into containers --- tiled/client/sync.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tiled/client/sync.py b/tiled/client/sync.py index c18c7af13..679d8cbd0 100644 --- a/tiled/client/sync.py +++ b/tiled/client/sync.py @@ -78,6 +78,9 @@ def _sync_container(source, dest, copy_internal, copy_external): original_data_sources and (original_data_sources[0].management != Management.external) and copy_internal + ) or ( + child_node.structure_family == StructureFamily.container + and (not original_data_sources) ): _DISPATCH[child_node.structure_family]( child_node, node, copy_internal, copy_external From e6fa35dd96fb39a13df1cb5b5447678313dbc070 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 6 May 2024 15:35:31 -0400 Subject: [PATCH 05/18] FIX: Pass include_data_sources through correctly. --- tiled/client/container.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tiled/client/container.py b/tiled/client/container.py index e69dae0b5..00942c29d 100644 --- a/tiled/client/container.py +++ b/tiled/client/container.py @@ -407,6 +407,7 @@ def _items_slice(self, start, stop, direction, _ignore_inlined_contents=False): self.context, self.structure_clients, item, + include_data_sources=self._include_data_sources, ) return if direction > 0: @@ -444,6 +445,7 @@ def _items_slice(self, start, stop, direction, _ignore_inlined_contents=False): self.context, self.structure_clients, item, + include_data_sources=self._include_data_sources, ) next_page_url = content["links"]["next"] From ce4eda182f9b602e5f9eebe58cd8a6f48424ff71 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 6 May 2024 15:35:54 -0400 Subject: [PATCH 06/18] Support search results, items, dict. --- tiled/client/sync.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/tiled/client/sync.py b/tiled/client/sync.py index 679d8cbd0..d2642f9f9 100644 --- a/tiled/client/sync.py +++ b/tiled/client/sync.py @@ -1,3 +1,4 @@ +import collections.abc import itertools from ..structures.core import StructureFamily @@ -18,8 +19,18 @@ def sync( source : tiled node dest : tiled node """ - source = source.include_data_sources().refresh() - _DISPATCH[source.structure_family](source, dest, copy_internal, copy_external) + if hasattr(source, "structure_family"): + # looks like a client object + _DISPATCH[source.structure_family]( + source.include_data_sources(), dest, copy_internal, copy_external + ) + elif isinstance(source, list): + # such as result of source.items().head() + _DISPATCH[StructureFamily.container]( + dict(source), dest, copy_internal, copy_external + ) + elif isinstance(source, collections.abc.Mapping): + _DISPATCH[StructureFamily.container](source, dest, copy_internal, copy_external) def _sync_array(source, dest, copy_internal, copy_external): @@ -38,7 +49,7 @@ def _sync_table(source, dest, copy_internal, copy_external): def _sync_container(source, dest, copy_internal, copy_external): for key, child_node in source.items(): - original_data_sources = child_node.data_sources() + original_data_sources = child_node.include_data_sources().data_sources() if not original_data_sources: if child_node.structure_family == StructureFamily.container: data_sources = [] From eb5af0781d44ff7bbff12de444c55afd2b12f89c Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 6 May 2024 15:36:37 -0400 Subject: [PATCH 07/18] Unit test sync --- tiled/_tests/test_sync.py | 102 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 tiled/_tests/test_sync.py diff --git a/tiled/_tests/test_sync.py b/tiled/_tests/test_sync.py new file mode 100644 index 000000000..2025e79b1 --- /dev/null +++ b/tiled/_tests/test_sync.py @@ -0,0 +1,102 @@ +import asyncio +import contextlib +import tempfile + +import numpy +import pandas +import tifffile + +from tiled.catalog import in_memory +from tiled.client import Context, from_context +from tiled.client.register import register +from tiled.client.smoke import read +from tiled.client.sync import sync +from tiled.queries import Key +from tiled.server.app import build_app + + +@contextlib.contextmanager +def client_factory(): + with tempfile.TemporaryDirectory() as tempdir: + catalog = in_memory(writable_storage=tempdir) + app = build_app(catalog) + with Context.from_app(app) as context: + client = from_context(context) + yield client + + +def populate_external(client, tmp_path): + "Populate a client with registered externally-managed data." + subdir = tmp_path / "subdir" + subdir.mkdir() + # array + image = numpy.ones((3, 5)) + for filepath in [tmp_path / "image.tiff", subdir / "nested_image.tiff"]: + tifffile.imwrite(filepath, image) + # table + for filepath in [tmp_path / "table.csv", subdir / "nested_table.csv"]: + with open(filepath, "w") as file: + file.write("x,y\n1,2\n3,4\n") + asyncio.run(register(client, tmp_path)) + + +def populate_internal(client): + "Populate a client with uploaded internally-managed data." + # array + client.write_array([1, 2, 3], key="a", metadata={"color": "red"}, specs=["alpha"]) + # table + df = pandas.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) + client.write_dataframe(df, key="b", metadata={"color": "green"}, specs=["beta"]) + # nested + container = client.create_container("c") + container.write_array( + [1, 2, 3], key="A", metadata={"color": "red"}, specs=["alpha"] + ) + container.write_dataframe(df, key="B", metadata={"color": "green"}, specs=["beta"]) + + +def test_sync_internal(): + with client_factory() as dest: + with client_factory() as source: + populate_internal(source) + sync(source, dest) + assert list(source) == list(dest) + assert list(source["c"]) == list(dest["c"]) + read(dest) + + +def test_sync_external(tmp_path): + with client_factory() as dest: + with client_factory() as source: + populate_external(source, tmp_path) + sync(source, dest) + assert list(source) == list(dest) + assert list(source["subdir"]) == list(dest["subdir"]) + read(dest) + + +def test_sync_search_results(): + with client_factory() as dest: + with client_factory() as source: + populate_internal(source) + results = source.search(Key("color") == "red") + sync(results, dest) + assert list(results) == list(dest) + + +def test_sync_items(): + with client_factory() as dest: + with client_factory() as source: + populate_internal(source) + select_items = source.items()[:2] + sync(select_items, dest) + assert [key for key, _ in select_items] == list(dest) + + +def test_sync_dict(): + with client_factory() as dest: + with client_factory() as source: + populate_internal(source) + select_dict = dict(source.items()[:2]) + sync(select_dict, dest) + assert list(select_dict) == list(dest) From bae187f31201973e071296addd1cee67c953f79a Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 6 May 2024 15:47:25 -0400 Subject: [PATCH 08/18] Test externally managed container format. --- tiled/_tests/test_sync.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tiled/_tests/test_sync.py b/tiled/_tests/test_sync.py index 2025e79b1..4d449b1fa 100644 --- a/tiled/_tests/test_sync.py +++ b/tiled/_tests/test_sync.py @@ -2,6 +2,7 @@ import contextlib import tempfile +import h5py import numpy import pandas import tifffile @@ -16,9 +17,9 @@ @contextlib.contextmanager -def client_factory(): +def client_factory(readable_storage=None): with tempfile.TemporaryDirectory() as tempdir: - catalog = in_memory(writable_storage=tempdir) + catalog = in_memory(writable_storage=tempdir, readable_storage=readable_storage) app = build_app(catalog) with Context.from_app(app) as context: client = from_context(context) @@ -37,6 +38,11 @@ def populate_external(client, tmp_path): for filepath in [tmp_path / "table.csv", subdir / "nested_table.csv"]: with open(filepath, "w") as file: file.write("x,y\n1,2\n3,4\n") + # container + for filepath in [tmp_path / "group.h5", subdir / "group.h5"]: + with h5py.File(filepath, "w") as file: + g = file.create_group("g") + g["data"] = numpy.arange(3) asyncio.run(register(client, tmp_path)) @@ -66,7 +72,7 @@ def test_sync_internal(): def test_sync_external(tmp_path): - with client_factory() as dest: + with client_factory(readable_storage=[tmp_path]) as dest: with client_factory() as source: populate_external(source, tmp_path) sync(source, dest) From 8d3450262da127ae3fbeedb03607e1b90e40d7c3 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 6 May 2024 16:28:44 -0400 Subject: [PATCH 09/18] Use smoke test read in strict mode in tests. --- tiled/_tests/test_sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tiled/_tests/test_sync.py b/tiled/_tests/test_sync.py index 4d449b1fa..a33921bd4 100644 --- a/tiled/_tests/test_sync.py +++ b/tiled/_tests/test_sync.py @@ -68,7 +68,7 @@ def test_sync_internal(): sync(source, dest) assert list(source) == list(dest) assert list(source["c"]) == list(dest["c"]) - read(dest) + read(dest, strict=True) def test_sync_external(tmp_path): @@ -78,7 +78,7 @@ def test_sync_external(tmp_path): sync(source, dest) assert list(source) == list(dest) assert list(source["subdir"]) == list(dest["subdir"]) - read(dest) + read(dest, strict=True) def test_sync_search_results(): From 9b6ff457fe844ce7d96c9abfd8ba4f8a2760ebf2 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 6 May 2024 16:50:22 -0400 Subject: [PATCH 10/18] Handle copying separately. --- tiled/client/sync.py | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/tiled/client/sync.py b/tiled/client/sync.py index d2642f9f9..edbe7a8ff 100644 --- a/tiled/client/sync.py +++ b/tiled/client/sync.py @@ -9,8 +9,6 @@ def sync( source: BaseClient, dest: BaseClient, - copy_internal: bool = True, - copy_external: bool = False, ): """ @@ -21,19 +19,15 @@ def sync( """ if hasattr(source, "structure_family"): # looks like a client object - _DISPATCH[source.structure_family]( - source.include_data_sources(), dest, copy_internal, copy_external - ) + _DISPATCH[source.structure_family](source.include_data_sources(), dest) elif isinstance(source, list): # such as result of source.items().head() - _DISPATCH[StructureFamily.container]( - dict(source), dest, copy_internal, copy_external - ) + _DISPATCH[StructureFamily.container](dict(source), dest) elif isinstance(source, collections.abc.Mapping): - _DISPATCH[StructureFamily.container](source, dest, copy_internal, copy_external) + _DISPATCH[StructureFamily.container](source, dest) -def _sync_array(source, dest, copy_internal, copy_external): +def _sync_array(source, dest): num_blocks = (range(len(n)) for n in source.chunks) # Loop over each block index --- e.g. (0, 0), (0, 1), (0, 2) .... for block in itertools.product(*num_blocks): @@ -41,16 +35,18 @@ def _sync_array(source, dest, copy_internal, copy_external): dest.write_block(array, block) -def _sync_table(source, dest, copy_internal, copy_external): +def _sync_table(source, dest): for partition in range(source.structure().npartitions): df = source.read_partition(partition) dest.write_partition(df, partition) -def _sync_container(source, dest, copy_internal, copy_external): +def _sync_container(source, dest): for key, child_node in source.items(): original_data_sources = child_node.include_data_sources().data_sources() if not original_data_sources: + # A container with no data sources is just an organizational + # entity in the database. if child_node.structure_family == StructureFamily.container: data_sources = [] else: @@ -62,10 +58,6 @@ def _sync_container(source, dest, copy_internal, copy_external): (original_data_source,) = original_data_sources if original_data_source.management == Management.external: data_sources = [original_data_source] - if copy_external: - raise NotImplementedError( - "Copying externally-managed data is not yet implemented" - ) else: if child_node.structure_family == StructureFamily.container: data_sources = [] @@ -88,14 +80,11 @@ def _sync_container(source, dest, copy_internal, copy_external): if ( original_data_sources and (original_data_sources[0].management != Management.external) - and copy_internal ) or ( child_node.structure_family == StructureFamily.container and (not original_data_sources) ): - _DISPATCH[child_node.structure_family]( - child_node, node, copy_internal, copy_external - ) + _DISPATCH[child_node.structure_family](child_node, node) _DISPATCH = { From 035dac127e19cc0f5ac37d304396f2b3ad9ad0a7 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Mon, 6 May 2024 16:59:53 -0400 Subject: [PATCH 11/18] Docstring with examples; CHANGELOG --- CHANGELOG.md | 10 ++++++++++ tiled/client/sync.py | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 717f290af..45fe964be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ Write the date in place of the "Unreleased" in the case a new version is release file was added to the repository root, `example_log_config.yml`. - Added `tiled.adapters.protocols` which will provide possibility for user to implement their custom adapters in a way that satisfies mypy. +- Added `tiled.client.smoke` with a utility for walking a node and ensuring + that the data in it can be read. +- Added `tiled.client.sync` with a utility for copying nodes between two + Tiled instances. ### Changed @@ -19,6 +23,12 @@ Write the date in place of the "Unreleased" in the case a new version is release significant speed-up and avoids frequently re-opening the SQLite file. - Metadata returned from the use of the `select_metadata` is now a one-item dictionary with 'selected' as the key, to match default type/behavior. +- The method `BaseClient.data_sources()` returns dataclass objects instead of + raw dict objects. + +### Fixed + +- Propagate setting `include_data_sources` into child nodes. ## v0.1.0a120 (25 April 2024) diff --git a/tiled/client/sync.py b/tiled/client/sync.py index edbe7a8ff..4342c42a7 100644 --- a/tiled/client/sync.py +++ b/tiled/client/sync.py @@ -11,11 +11,29 @@ def sync( dest: BaseClient, ): """ + Copy data from one Tiled instance to another. Parameters ---------- source : tiled node dest : tiled node + + Examples + -------- + + Connect to two instances and copy data. + + >>> from tiled.client import from_uri + >>> a = from_uri("http://localhost:8000", api_key="secret") + >>> b = from_uri("http://localhost:9000", api_key="secret") + >>> sync(a, b) + + + Copy select data. + + >>> sync(a.items().head(), b) + >>> sync(a.search(...), b) + """ if hasattr(source, "structure_family"): # looks like a client object From 096ee4c02df1ea31cce1c0727c52fa984855bb25 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Tue, 7 May 2024 13:04:23 -0400 Subject: [PATCH 12/18] Add missing assert --- tiled/_tests/test_writing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tiled/_tests/test_writing.py b/tiled/_tests/test_writing.py index 9c554f2f5..6113c1474 100644 --- a/tiled/_tests/test_writing.py +++ b/tiled/_tests/test_writing.py @@ -483,7 +483,7 @@ def test_write_with_specified_mimetype(tree): x.write_partition(df, 0) x.read() x.refresh() - x.data_sources()[0].mimetype == mimetype + assert x.data_sources()[0].mimetype == mimetype # Specifying unsupported mimetype raises expected error. with fail_with_status_code(415): From b268c373613ace96960b06d6b54ac477327e5f48 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Tue, 7 May 2024 13:07:15 -0400 Subject: [PATCH 13/18] Rename sync to copy. --- tiled/_tests/test_sync.py | 22 +++++++++++----------- tiled/client/sync.py | 21 +++++++++++---------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/tiled/_tests/test_sync.py b/tiled/_tests/test_sync.py index a33921bd4..9f805c8f7 100644 --- a/tiled/_tests/test_sync.py +++ b/tiled/_tests/test_sync.py @@ -11,7 +11,7 @@ from tiled.client import Context, from_context from tiled.client.register import register from tiled.client.smoke import read -from tiled.client.sync import sync +from tiled.client.sync import copy from tiled.queries import Key from tiled.server.app import build_app @@ -61,48 +61,48 @@ def populate_internal(client): container.write_dataframe(df, key="B", metadata={"color": "green"}, specs=["beta"]) -def test_sync_internal(): +def test_copy_internal(): with client_factory() as dest: with client_factory() as source: populate_internal(source) - sync(source, dest) + copy(source, dest) assert list(source) == list(dest) assert list(source["c"]) == list(dest["c"]) read(dest, strict=True) -def test_sync_external(tmp_path): +def test_copy_external(tmp_path): with client_factory(readable_storage=[tmp_path]) as dest: with client_factory() as source: populate_external(source, tmp_path) - sync(source, dest) + copy(source, dest) assert list(source) == list(dest) assert list(source["subdir"]) == list(dest["subdir"]) read(dest, strict=True) -def test_sync_search_results(): +def test_copy_search_results(): with client_factory() as dest: with client_factory() as source: populate_internal(source) results = source.search(Key("color") == "red") - sync(results, dest) + copy(results, dest) assert list(results) == list(dest) -def test_sync_items(): +def test_copy_items(): with client_factory() as dest: with client_factory() as source: populate_internal(source) select_items = source.items()[:2] - sync(select_items, dest) + copy(select_items, dest) assert [key for key, _ in select_items] == list(dest) -def test_sync_dict(): +def test_copy_dict(): with client_factory() as dest: with client_factory() as source: populate_internal(source) select_dict = dict(source.items()[:2]) - sync(select_dict, dest) + copy(select_dict, dest) assert list(select_dict) == list(dest) diff --git a/tiled/client/sync.py b/tiled/client/sync.py index 4342c42a7..709d78044 100644 --- a/tiled/client/sync.py +++ b/tiled/client/sync.py @@ -6,7 +6,7 @@ from .base import BaseClient -def sync( +def copy( source: BaseClient, dest: BaseClient, ): @@ -24,15 +24,16 @@ def sync( Connect to two instances and copy data. >>> from tiled.client import from_uri + >>> from tiled.client.sync import copy >>> a = from_uri("http://localhost:8000", api_key="secret") >>> b = from_uri("http://localhost:9000", api_key="secret") - >>> sync(a, b) + >>> copy(a, b) Copy select data. - >>> sync(a.items().head(), b) - >>> sync(a.search(...), b) + >>> copy(a.items().head(), b) + >>> copy(a.search(...), b) """ if hasattr(source, "structure_family"): @@ -45,7 +46,7 @@ def sync( _DISPATCH[StructureFamily.container](source, dest) -def _sync_array(source, dest): +def _copy_array(source, dest): num_blocks = (range(len(n)) for n in source.chunks) # Loop over each block index --- e.g. (0, 0), (0, 1), (0, 2) .... for block in itertools.product(*num_blocks): @@ -53,13 +54,13 @@ def _sync_array(source, dest): dest.write_block(array, block) -def _sync_table(source, dest): +def _copy_table(source, dest): for partition in range(source.structure().npartitions): df = source.read_partition(partition) dest.write_partition(df, partition) -def _sync_container(source, dest): +def _copy_container(source, dest): for key, child_node in source.items(): original_data_sources = child_node.include_data_sources().data_sources() if not original_data_sources: @@ -106,7 +107,7 @@ def _sync_container(source, dest): _DISPATCH = { - StructureFamily.array: _sync_array, - StructureFamily.container: _sync_container, - StructureFamily.table: _sync_table, + StructureFamily.array: _copy_array, + StructureFamily.container: _copy_container, + StructureFamily.table: _copy_table, } From c89dfaae369716a531e86be2d15cb87dd033be4b Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Tue, 7 May 2024 13:09:08 -0400 Subject: [PATCH 14/18] Simplify logic --- tiled/client/sync.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tiled/client/sync.py b/tiled/client/sync.py index 709d78044..944451f47 100644 --- a/tiled/client/sync.py +++ b/tiled/client/sync.py @@ -1,4 +1,3 @@ -import collections.abc import itertools from ..structures.core import StructureFamily @@ -39,11 +38,8 @@ def copy( if hasattr(source, "structure_family"): # looks like a client object _DISPATCH[source.structure_family](source.include_data_sources(), dest) - elif isinstance(source, list): - # such as result of source.items().head() + else: _DISPATCH[StructureFamily.container](dict(source), dest) - elif isinstance(source, collections.abc.Mapping): - _DISPATCH[StructureFamily.container](source, dest) def _copy_array(source, dest): From 86dd6b1526ca9178ea79f2d850d65c903ec235b2 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Tue, 7 May 2024 13:09:16 -0400 Subject: [PATCH 15/18] Typo in error message --- tiled/client/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tiled/client/sync.py b/tiled/client/sync.py index 944451f47..385675614 100644 --- a/tiled/client/sync.py +++ b/tiled/client/sync.py @@ -66,7 +66,7 @@ def _copy_container(source, dest): data_sources = [] else: raise ValueError( - f"Unable to copy {child_node} which has is a " + f"Unable to copy {child_node} which is a " f"{child_node.structure_family} but has no data sources." ) else: From 56338b90d3e62dfce38c5be824b3572a4f0c2053 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Tue, 7 May 2024 13:17:38 -0400 Subject: [PATCH 16/18] Check number of data sources explicitly. --- tiled/client/sync.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tiled/client/sync.py b/tiled/client/sync.py index 385675614..675b6aedb 100644 --- a/tiled/client/sync.py +++ b/tiled/client/sync.py @@ -59,7 +59,8 @@ def _copy_table(source, dest): def _copy_container(source, dest): for key, child_node in source.items(): original_data_sources = child_node.include_data_sources().data_sources() - if not original_data_sources: + num_data_sources = len(original_data_sources) + if num_data_sources == 0: # A container with no data sources is just an organizational # entity in the database. if child_node.structure_family == StructureFamily.container: @@ -69,7 +70,7 @@ def _copy_container(source, dest): f"Unable to copy {child_node} which is a " f"{child_node.structure_family} but has no data sources." ) - else: + elif num_data_sources == 1: (original_data_source,) = original_data_sources if original_data_source.management == Management.external: data_sources = [original_data_source] @@ -85,6 +86,12 @@ def _copy_container(source, dest): structure=original_data_source.structure, ) ] + else: + # As of this writing this is impossible, but we anticipate that + # it may be added someday. + raise NotImplementedError( + "Multiple Data Sources in one Node is not supported." + ) node = dest.new( key=key, structure_family=child_node.structure_family, From 49ec65466680d0a4014d36190857be4e94278c42 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Tue, 7 May 2024 13:45:36 -0400 Subject: [PATCH 17/18] Support awkward, sparse structures in sync.copy --- tiled/_tests/test_sync.py | 17 +++++++++++++++++ tiled/client/base.py | 23 ++--------------------- tiled/client/container.py | 9 +++++++++ tiled/client/sync.py | 18 ++++++++++++++++++ tiled/structures/core.py | 21 +++++++++++++++++++++ 5 files changed, 67 insertions(+), 21 deletions(-) diff --git a/tiled/_tests/test_sync.py b/tiled/_tests/test_sync.py index 9f805c8f7..1722e2e69 100644 --- a/tiled/_tests/test_sync.py +++ b/tiled/_tests/test_sync.py @@ -2,9 +2,11 @@ import contextlib import tempfile +import awkward import h5py import numpy import pandas +import sparse import tifffile from tiled.catalog import in_memory @@ -43,6 +45,9 @@ def populate_external(client, tmp_path): with h5py.File(filepath, "w") as file: g = file.create_group("g") g["data"] = numpy.arange(3) + # Note: Tiled does not currently happen to support any formats that it + # identifies as 'awkward' or 'sparse'. Potentially it could, and this + # test could be expanded to include those examples. asyncio.run(register(client, tmp_path)) @@ -53,12 +58,24 @@ def populate_internal(client): # table df = pandas.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) client.write_dataframe(df, key="b", metadata={"color": "green"}, specs=["beta"]) + # awkward + client.write_awkward( + awkward.Array([1, [2, 3]]), key="d", metadata={"color": "red"}, specs=["alpha"] + ) + # sparse + coo = sparse.COO(coords=[[2, 5]], data=[1.3, 7.5], shape=(10,)) + client.write_sparse(key="e", coords=coo.coords, data=coo.data, shape=coo.shape) + # nested container = client.create_container("c") container.write_array( [1, 2, 3], key="A", metadata={"color": "red"}, specs=["alpha"] ) container.write_dataframe(df, key="B", metadata={"color": "green"}, specs=["beta"]) + container.write_awkward( + awkward.Array([1, [2, 3]]), key="D", metadata={"color": "red"}, specs=["alpha"] + ) + container.write_sparse(key="E", coords=coo.coords, data=coo.data, shape=coo.shape) def test_copy_internal(): diff --git a/tiled/client/base.py b/tiled/client/base.py index 1dc2e0726..49bf8587d 100644 --- a/tiled/client/base.py +++ b/tiled/client/base.py @@ -1,4 +1,3 @@ -import importlib import time import warnings from dataclasses import asdict @@ -6,9 +5,9 @@ from httpx import URL -from ..structures.core import Spec, StructureFamily +from ..structures.core import STRUCTURE_TYPES, Spec, StructureFamily from ..structures.data_source import DataSource -from ..utils import UNCHANGED, DictView, ListView, OneShotCachedMap, safe_json_dump +from ..utils import UNCHANGED, DictView, ListView, safe_json_dump from .utils import MSGPACK_MIME_TYPE, handle_error @@ -437,21 +436,3 @@ def delete_tree(self): def __dask_tokenize__(self): return (type(self), self.uri) - - -STRUCTURE_TYPES = OneShotCachedMap( - { - StructureFamily.array: lambda: importlib.import_module( - "...structures.array", BaseClient.__module__ - ).ArrayStructure, - StructureFamily.awkward: lambda: importlib.import_module( - "...structures.awkward", BaseClient.__module__ - ).AwkwardStructure, - StructureFamily.table: lambda: importlib.import_module( - "...structures.table", BaseClient.__module__ - ).TableStructure, - StructureFamily.sparse: lambda: importlib.import_module( - "...structures.sparse", BaseClient.__module__ - ).SparseStructure, - } -) diff --git a/tiled/client/container.py b/tiled/client/container.py index 00942c29d..53719dfb8 100644 --- a/tiled/client/container.py +++ b/tiled/client/container.py @@ -652,6 +652,15 @@ def new( # Merge in "id" and "links" returned by the server. item.update(document) + # Ensure this is a dataclass, not a dict. + # When we apply type hints and mypy to the client it should be possible + # to dispense with this. + if (structure_family != StructureFamily.container) and isinstance( + structure, dict + ): + structure_type = STRUCTURE_TYPES[structure_family] + structure = structure_type.from_json(structure) + return client_for_item( self.context, self.structure_clients, diff --git a/tiled/client/sync.py b/tiled/client/sync.py index 675b6aedb..230ea20a9 100644 --- a/tiled/client/sync.py +++ b/tiled/client/sync.py @@ -50,6 +50,22 @@ def _copy_array(source, dest): dest.write_block(array, block) +def _copy_awkward(source, dest): + import awkward + + array = source.read() + _form, _length, container = awkward.to_buffers(array) + dest.write(container) + + +def _copy_sparse(source, dest): + num_blocks = (range(len(n)) for n in source.chunks) + # Loop over each block index --- e.g. (0, 0), (0, 1), (0, 2) .... + for block in itertools.product(*num_blocks): + array = source.read_block(block) + dest.write_block(array.coords, array.data, block) + + def _copy_table(source, dest): for partition in range(source.structure().npartitions): df = source.read_partition(partition) @@ -111,6 +127,8 @@ def _copy_container(source, dest): _DISPATCH = { StructureFamily.array: _copy_array, + StructureFamily.awkward: _copy_awkward, StructureFamily.container: _copy_container, + StructureFamily.sparse: _copy_sparse, StructureFamily.table: _copy_table, } diff --git a/tiled/structures/core.py b/tiled/structures/core.py index a4708b35e..309ada6e8 100644 --- a/tiled/structures/core.py +++ b/tiled/structures/core.py @@ -5,9 +5,12 @@ """ import enum +import importlib from dataclasses import asdict, dataclass from typing import Dict, Optional +from ..utils import OneShotCachedMap + class StructureFamily(str, enum.Enum): array = "array" @@ -41,3 +44,21 @@ def dict(self) -> Dict[str, Optional[str]]: return asdict(self) model_dump = dict # For easy interoperability with pydantic 2.x models + + +STRUCTURE_TYPES = OneShotCachedMap( + { + StructureFamily.array: lambda: importlib.import_module( + "...structures.array", StructureFamily.__module__ + ).ArrayStructure, + StructureFamily.awkward: lambda: importlib.import_module( + "...structures.awkward", StructureFamily.__module__ + ).AwkwardStructure, + StructureFamily.table: lambda: importlib.import_module( + "...structures.table", StructureFamily.__module__ + ).TableStructure, + StructureFamily.sparse: lambda: importlib.import_module( + "...structures.sparse", StructureFamily.__module__ + ).SparseStructure, + } +) From 1545f9b5f7c5c2a077b892693fd300d029d40ed3 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Tue, 7 May 2024 13:47:54 -0400 Subject: [PATCH 18/18] Add tiled.client.sync to API docs. --- docs/source/reference/python-client.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/source/reference/python-client.md b/docs/source/reference/python-client.md index 93a4cbdf0..c43bc1a81 100644 --- a/docs/source/reference/python-client.md +++ b/docs/source/reference/python-client.md @@ -292,3 +292,13 @@ Tiled currently includes two clients for each structure family: tiled.client.cache.Cache.size tiled.client.cache.Cache.write_safe ``` + +## Sync + + +```{eval-rst} +.. autosummary:: + :toctree: generated + + tiled.client.sync.copy +```