Skip to content

Commit

Permalink
Merge pull request #735 from danielballan/sync
Browse files Browse the repository at this point in the history
Add utility for syncing nodes between two Tiled instances
  • Loading branch information
genematx authored May 7, 2024
2 parents 552260d + 1545f9b commit 88f4f5f
Show file tree
Hide file tree
Showing 11 changed files with 341 additions and 37 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,23 @@ Write the date in place of the "Unreleased" in the case a new version is release
file was added to the repository root, `example_log_config.yml`.
- Added `tiled.adapters.protocols` which will provide possibility for user to
implement their custom adapters in a way that satisfies mypy.
- Added `tiled.client.smoke` with a utility for walking a node and ensuring
that the data in it can be read.
- Added `tiled.client.sync` with a utility for copying nodes between two
Tiled instances.

### Changed

- SQLite-backed catalogs now employ connection pooling. This results in a
significant speed-up and avoids frequently re-opening the SQLite file.
- Metadata returned from the use of the `select_metadata` is now a one-item
dictionary with 'selected' as the key, to match default type/behavior.
- The method `BaseClient.data_sources()` returns dataclass objects instead of
raw dict objects.

### Fixed

- Propagate setting `include_data_sources` into child nodes.

## v0.1.0a120 (25 April 2024)

Expand Down
10 changes: 10 additions & 0 deletions docs/source/reference/python-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,13 @@ Tiled currently includes two clients for each structure family:
tiled.client.cache.Cache.size
tiled.client.cache.Cache.write_safe
```

## Sync


```{eval-rst}
.. autosummary::
:toctree: generated
tiled.client.sync.copy
```
2 changes: 1 addition & 1 deletion tiled/_tests/test_asset_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_raw_export(client, tmpdir):
client.write_array([1, 2, 3], key="x")
exported_paths = client["x"].raw_export(tmpdir / "exported")
data_sources = client["x"].include_data_sources().data_sources()
orig_dir = path_from_uri(data_sources[0]["assets"][0]["data_uri"])
orig_dir = path_from_uri(data_sources[0].assets[0].data_uri)
_asset_id, relative_paths = client["x"].asset_manifest(data_sources).popitem()
orig_paths = [Path(orig_dir, relative_path) for relative_path in relative_paths]
orig_hashes = [hashlib.md5(path.read_bytes()).digest() for path in orig_paths]
Expand Down
125 changes: 125 additions & 0 deletions tiled/_tests/test_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import asyncio
import contextlib
import tempfile

import awkward
import h5py
import numpy
import pandas
import sparse
import tifffile

from tiled.catalog import in_memory
from tiled.client import Context, from_context
from tiled.client.register import register
from tiled.client.smoke import read
from tiled.client.sync import copy
from tiled.queries import Key
from tiled.server.app import build_app


@contextlib.contextmanager
def client_factory(readable_storage=None):
with tempfile.TemporaryDirectory() as tempdir:
catalog = in_memory(writable_storage=tempdir, readable_storage=readable_storage)
app = build_app(catalog)
with Context.from_app(app) as context:
client = from_context(context)
yield client


def populate_external(client, tmp_path):
"Populate a client with registered externally-managed data."
subdir = tmp_path / "subdir"
subdir.mkdir()
# array
image = numpy.ones((3, 5))
for filepath in [tmp_path / "image.tiff", subdir / "nested_image.tiff"]:
tifffile.imwrite(filepath, image)
# table
for filepath in [tmp_path / "table.csv", subdir / "nested_table.csv"]:
with open(filepath, "w") as file:
file.write("x,y\n1,2\n3,4\n")
# container
for filepath in [tmp_path / "group.h5", subdir / "group.h5"]:
with h5py.File(filepath, "w") as file:
g = file.create_group("g")
g["data"] = numpy.arange(3)
# Note: Tiled does not currently happen to support any formats that it
# identifies as 'awkward' or 'sparse'. Potentially it could, and this
# test could be expanded to include those examples.
asyncio.run(register(client, tmp_path))


def populate_internal(client):
"Populate a client with uploaded internally-managed data."
# array
client.write_array([1, 2, 3], key="a", metadata={"color": "red"}, specs=["alpha"])
# table
df = pandas.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
client.write_dataframe(df, key="b", metadata={"color": "green"}, specs=["beta"])
# awkward
client.write_awkward(
awkward.Array([1, [2, 3]]), key="d", metadata={"color": "red"}, specs=["alpha"]
)
# sparse
coo = sparse.COO(coords=[[2, 5]], data=[1.3, 7.5], shape=(10,))
client.write_sparse(key="e", coords=coo.coords, data=coo.data, shape=coo.shape)

# nested
container = client.create_container("c")
container.write_array(
[1, 2, 3], key="A", metadata={"color": "red"}, specs=["alpha"]
)
container.write_dataframe(df, key="B", metadata={"color": "green"}, specs=["beta"])
container.write_awkward(
awkward.Array([1, [2, 3]]), key="D", metadata={"color": "red"}, specs=["alpha"]
)
container.write_sparse(key="E", coords=coo.coords, data=coo.data, shape=coo.shape)


def test_copy_internal():
with client_factory() as dest:
with client_factory() as source:
populate_internal(source)
copy(source, dest)
assert list(source) == list(dest)
assert list(source["c"]) == list(dest["c"])
read(dest, strict=True)


def test_copy_external(tmp_path):
with client_factory(readable_storage=[tmp_path]) as dest:
with client_factory() as source:
populate_external(source, tmp_path)
copy(source, dest)
assert list(source) == list(dest)
assert list(source["subdir"]) == list(dest["subdir"])
read(dest, strict=True)


def test_copy_search_results():
with client_factory() as dest:
with client_factory() as source:
populate_internal(source)
results = source.search(Key("color") == "red")
copy(results, dest)
assert list(results) == list(dest)


def test_copy_items():
with client_factory() as dest:
with client_factory() as source:
populate_internal(source)
select_items = source.items()[:2]
copy(select_items, dest)
assert [key for key, _ in select_items] == list(dest)


def test_copy_dict():
with client_factory() as dest:
with client_factory() as source:
populate_internal(source)
select_dict = dict(source.items()[:2])
copy(select_dict, dest)
assert list(select_dict) == list(dest)
2 changes: 1 addition & 1 deletion tiled/_tests/test_writing.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ def test_write_with_specified_mimetype(tree):
x.write_partition(df, 0)
x.read()
x.refresh()
x.data_sources()[0]["mimetype"] == mimetype
assert x.data_sources()[0].mimetype == mimetype

# Specifying unsupported mimetype raises expected error.
with fail_with_status_code(415):
Expand Down
53 changes: 20 additions & 33 deletions tiled/client/base.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import importlib
import time
import warnings
from dataclasses import asdict
from pathlib import Path

from httpx import URL

from ..structures.core import Spec, StructureFamily
from ..utils import UNCHANGED, DictView, ListView, OneShotCachedMap, safe_json_dump
from ..structures.core import STRUCTURE_TYPES, Spec, StructureFamily
from ..structures.data_source import DataSource
from ..utils import UNCHANGED, DictView, ListView, safe_json_dump
from .utils import MSGPACK_MIME_TYPE, handle_error


Expand Down Expand Up @@ -218,7 +218,12 @@ def data_sources(self):
`from_uri(...)` or similar."""
)

return self.include_data_sources().item["attributes"].get("data_sources")
data_sources_json = (
self.include_data_sources().item["attributes"].get("data_sources")
)
if data_sources_json is None:
return None
return [DataSource.from_json(d) for d in data_sources_json]

def include_data_sources(self):
"""
Expand Down Expand Up @@ -271,16 +276,16 @@ def asset_manifest(self, data_sources):
manifest_link = self.item["links"]["self"].replace(
"/metadata", "/asset/manifest", 1
)
for asset in data_source["assets"]:
if asset["is_directory"]:
for asset in data_source.assets:
if asset.is_directory:
manifest = handle_error(
self.context.http_client.get(
manifest_link, params={"id": asset["id"]}
manifest_link, params={"id": asset.id}
)
).json()["manifest"]
else:
manifest = None
manifests[asset["id"]] = manifest
manifests[asset.id] = manifest
return manifests

def raw_export(self, destination_directory=None, max_workers=4):
Expand Down Expand Up @@ -321,22 +326,22 @@ def raw_export(self, destination_directory=None, max_workers=4):
bytes_link = self.item["links"]["self"].replace(
"/metadata", "/asset/bytes", 1
)
for asset in data_source["assets"]:
if len(data_source["assets"]) == 1:
for asset in data_source.assets:
if len(data_source.assets) == 1:
# Only one asset: keep the name simple.
base_path = destination_directory
else:
# Multiple assets: Add a subdirectory named for the asset
# id to namespace each asset.
base_path = Path(destination_directory, str(asset["id"]))
if asset["is_directory"]:
relative_paths = asset_manifest[asset["id"]]
base_path = Path(destination_directory, str(asset.id))
if asset.is_directory:
relative_paths = asset_manifest[asset.id]
urls.extend(
[
URL(
bytes_link,
params={
"id": asset["id"],
"id": asset.id,
"relative_path": relative_path,
},
)
Expand All @@ -350,7 +355,7 @@ def raw_export(self, destination_directory=None, max_workers=4):
]
)
else:
urls.append(URL(bytes_link, params={"id": asset["id"]}))
urls.append(URL(bytes_link, params={"id": asset.id}))
paths.append(Path(base_path, ATTACHMENT_FILENAME_PLACEHOLDER))
return download(self.context.http_client, urls, paths, max_workers=max_workers)

Expand Down Expand Up @@ -431,21 +436,3 @@ def delete_tree(self):

def __dask_tokenize__(self):
return (type(self), self.uri)


STRUCTURE_TYPES = OneShotCachedMap(
{
StructureFamily.array: lambda: importlib.import_module(
"...structures.array", BaseClient.__module__
).ArrayStructure,
StructureFamily.awkward: lambda: importlib.import_module(
"...structures.awkward", BaseClient.__module__
).AwkwardStructure,
StructureFamily.table: lambda: importlib.import_module(
"...structures.table", BaseClient.__module__
).TableStructure,
StructureFamily.sparse: lambda: importlib.import_module(
"...structures.sparse", BaseClient.__module__
).SparseStructure,
}
)
11 changes: 11 additions & 0 deletions tiled/client/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ def _items_slice(self, start, stop, direction, _ignore_inlined_contents=False):
self.context,
self.structure_clients,
item,
include_data_sources=self._include_data_sources,
)
return
if direction > 0:
Expand Down Expand Up @@ -444,6 +445,7 @@ def _items_slice(self, start, stop, direction, _ignore_inlined_contents=False):
self.context,
self.structure_clients,
item,
include_data_sources=self._include_data_sources,
)
next_page_url = content["links"]["next"]

Expand Down Expand Up @@ -650,6 +652,15 @@ def new(
# Merge in "id" and "links" returned by the server.
item.update(document)

# Ensure this is a dataclass, not a dict.
# When we apply type hints and mypy to the client it should be possible
# to dispense with this.
if (structure_family != StructureFamily.container) and isinstance(
structure, dict
):
structure_type = STRUCTURE_TYPES[structure_family]
structure = structure_type.from_json(structure)

return client_for_item(
self.context,
self.structure_clients,
Expand Down
Loading

0 comments on commit 88f4f5f

Please sign in to comment.