diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cc75e63bc..131c5efb8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,10 +55,25 @@ jobs: shell: bash -l {0} run: source continuous_integration/scripts/start_LDAP.sh + - name: Download SQLite example data. + shell: bash -l {0} + run: source continuous_integration/scripts/download_sqlite_data.sh + - name: Start PostgreSQL service in container. shell: bash -l {0} run: source continuous_integration/scripts/start_postgres.sh + + - name: Ensure example data is migrated to current catalog database schema. + # The example data is expected to be kept up to date to the latest Tiled + # release, but this CI run may include some unreleased schema changes, + # so we run a migration here. + shell: bash -l {0} + run: | + set -vxeuo pipefail + tiled catalog upgrade-database sqlite+aiosqlite:///tiled_test_db_sqlite.db + tiled catalog upgrade-database postgresql+asyncpg://postgres:secret@localhost:5432/tiled-example-data + - name: Test with pytest shell: bash -l {0} run: | diff --git a/tiled/_tests/test_catalog.py b/tiled/_tests/test_catalog.py index d1d9136a5..8eab51b3b 100644 --- a/tiled/_tests/test_catalog.py +++ b/tiled/_tests/test_catalog.py @@ -8,6 +8,8 @@ import pandas.testing import pytest import pytest_asyncio +import sqlalchemy.dialects.postgresql.asyncpg +import sqlalchemy.exc import tifffile import xarray @@ -17,12 +19,13 @@ from ..catalog import in_memory from ..catalog.adapter import WouldDeleteData from ..catalog.explain import record_explanations +from ..catalog.register import create_node_safe from ..catalog.utils import ensure_uri from ..client import Context, from_context from ..client.xarray import write_xarray_dataset from ..queries import Eq, Key from ..server.app import build_app, build_app_from_config -from ..server.schemas import Asset, DataSource +from ..server.schemas import Asset, DataSource, Management from ..structures.core import StructureFamily from .utils import enter_password @@ -197,9 +200,10 @@ async def test_metadata_index_is_used(example_data_adapter): @pytest.mark.asyncio async def test_write_array_external(a, tmpdir): arr = numpy.ones((5, 3)) - filepath = tmpdir / "file.tiff" - tifffile.imwrite(str(filepath), arr) - ad = TiffAdapter(str(filepath)) + filepath = str(tmpdir / "file.tiff") + data_uri = ensure_uri(filepath) + tifffile.imwrite(filepath, arr) + ad = TiffAdapter(data_uri) structure = asdict(ad.structure()) await a.create_node( key="x", @@ -211,7 +215,14 @@ async def test_write_array_external(a, tmpdir): structure=structure, parameters={}, management="external", - assets=[Asset(data_uri=str(ensure_uri(filepath)), is_directory=False)], + assets=[ + Asset( + parameter="data_uri", + num=None, + data_uri=str(data_uri), + is_directory=False, + ) + ], ) ], ) @@ -222,9 +233,10 @@ async def test_write_array_external(a, tmpdir): @pytest.mark.asyncio async def test_write_dataframe_external_direct(a, tmpdir): df = pandas.DataFrame(numpy.ones((5, 3)), columns=list("abc")) - filepath = tmpdir / "file.csv" + filepath = str(tmpdir / "file.csv") + data_uri = ensure_uri(filepath) df.to_csv(filepath, index=False) - dfa = read_csv(filepath) + dfa = read_csv(data_uri) structure = asdict(dfa.structure()) await a.create_node( key="x", @@ -236,7 +248,14 @@ async def test_write_dataframe_external_direct(a, tmpdir): structure=structure, parameters={}, management="external", - assets=[Asset(data_uri=str(ensure_uri(filepath)), is_directory=False)], + assets=[ + Asset( + parameter="data_uri", + num=None, + data_uri=data_uri, + is_directory=False, + ) + ], ) ], ) @@ -411,3 +430,98 @@ async def test_access_control(tmpdir): public_client["outer_z"]["inner"].read() with pytest.raises(KeyError): public_client["outer_x"] + + +@pytest.mark.parametrize( + "assets", + [ + [ + Asset( + data_uri="file://localhost/test1", + is_directory=False, + parameter="filepath", + num=None, + ), + Asset( + data_uri="file://localhost/test2", + is_directory=False, + parameter="filepath", + num=1, + ), + ], + [ + Asset( + data_uri="file://localhost/test1", + is_directory=False, + parameter="filepath", + num=1, + ), + Asset( + data_uri="file://localhost/test2", + is_directory=False, + parameter="filepath", + num=None, + ), + ], + [ + Asset( + data_uri="file://localhost/test1", + is_directory=False, + parameter="filepath", + num=None, + ), + Asset( + data_uri="file://localhost/test2", + is_directory=False, + parameter="filepath", + num=None, + ), + ], + [ + Asset( + data_uri="file://localhost/test1", + is_directory=False, + parameter="filepath", + num=1, + ), + Asset( + data_uri="file://localhost/test2", + is_directory=False, + parameter="filepath", + num=1, + ), + ], + ], + ids=[ + "null-then-int", + "int-then-null", + "duplicate-null", + "duplicate-int", + ], +) +@pytest.mark.asyncio +async def test_constraints_on_parameter_and_num(a, assets): + "Test constraints enforced by database on 'parameter' and 'num'." + arr_adapter = ArrayAdapter.from_array([1, 2, 3]) + with pytest.raises( + ( + sqlalchemy.exc.IntegrityError, # SQLite + sqlalchemy.exc.DBAPIError, # PostgreSQL + ) + ): + await create_node_safe( + a, + key="test", + structure_family=arr_adapter.structure_family, + metadata=dict(arr_adapter.metadata()), + specs=arr_adapter.specs, + data_sources=[ + DataSource( + mimetype="application/x-test", + structure=asdict(arr_adapter.structure()), + parameters={}, + management=Management.external, + assets=assets, + ) + ], + ) diff --git a/tiled/_tests/test_directory_walker.py b/tiled/_tests/test_directory_walker.py index f6799741c..b3e974a0d 100644 --- a/tiled/_tests/test_directory_walker.py +++ b/tiled/_tests/test_directory_walker.py @@ -1,20 +1,33 @@ +import dataclasses import platform +import random from pathlib import Path +import h5py +import numpy import pytest import tifffile +import yaml +from ..adapters.hdf5 import HDF5Adapter +from ..adapters.tiff import TiffAdapter from ..catalog import in_memory from ..catalog.register import ( + Settings, + create_node_safe, + group_tiff_sequences, identity, register, + register_tiff_sequence, skip_all, strip_suffixes, - tiff_sequence, ) +from ..catalog.utils import ensure_uri from ..client import Context, from_context from ..examples.generate_files import data, df1, generate_files from ..server.app import build_app +from ..server.schemas import Asset, DataSource, Management +from ..utils import path_from_uri @pytest.fixture @@ -41,9 +54,9 @@ async def test_collision(example_data_dir, tmpdir): p = Path(example_data_dir, "a.tiff") tifffile.imwrite(str(p), data) - tree = in_memory() - with Context.from_app(build_app(tree)) as context: - await register(tree, example_data_dir) + catalog = in_memory(writable_storage=tmpdir) + with Context.from_app(build_app(catalog)) as context: + await register(catalog, example_data_dir) client = from_context(context) @@ -54,7 +67,7 @@ async def test_collision(example_data_dir, tmpdir): p.unlink() # Re-run registration; entry should be there now. - await register(tree, example_data_dir) + await register(catalog, example_data_dir) assert "a" in client @@ -73,9 +86,9 @@ async def test_same_filename_separate_directory(tmpdir): Path(tmpdir, "two").mkdir() df1.to_csv(Path(tmpdir, "one", "a.csv")) df1.to_csv(Path(tmpdir, "two", "a.csv")) - tree = in_memory() - with Context.from_app(build_app(tree)) as context: - await register(tree, tmpdir) + catalog = in_memory(writable_storage=tmpdir) + with Context.from_app(build_app(catalog)) as context: + await register(catalog, tmpdir) client = from_context(context) assert "a" in client["one"] assert "a" in client["two"] @@ -108,10 +121,10 @@ def detect_mimetype(path, mimetype): return "text/csv" return mimetype - tree = in_memory() - with Context.from_app(build_app(tree)) as context: + catalog = in_memory(writable_storage=tmpdir) + with Context.from_app(build_app(catalog)) as context: await register( - tree, + catalog, tmpdir, mimetype_detection_hook=detect_mimetype, key_from_filename=identity, @@ -130,18 +143,174 @@ async def test_skip_all_in_combination(tmpdir): for i in range(2): tifffile.imwrite(Path(tmpdir, "one", f"image{i:05}.tif"), data) - tree = in_memory() + catalog = in_memory(writable_storage=tmpdir) # By default, both file and tiff sequence are registered. - with Context.from_app(build_app(tree)) as context: - await register(tree, tmpdir) + with Context.from_app(build_app(catalog)) as context: + await register(catalog, tmpdir) client = from_context(context) assert "a" in client assert "a" in client["one"] assert "image" in client["one"] # With skip_all, directories and tiff sequence are registered, but individual files are not - with Context.from_app(build_app(tree)) as context: - await register(tree, tmpdir, walkers=[tiff_sequence, skip_all]) + with Context.from_app(build_app(catalog)) as context: + await register(catalog, tmpdir, walkers=[group_tiff_sequences, skip_all]) client = from_context(context) assert list(client) == ["one"] assert "image" in client["one"] + + +@pytest.mark.asyncio +async def test_tiff_seq_custom_sorting(tmpdir): + "Register TIFFs that are not in alphanumeric order." + N = 10 + ordering = list(range(N)) + random.Random(0).shuffle(ordering) + files = [] + for i in ordering: + file = Path(tmpdir, f"image{i:05}.tif") + files.append(file) + # data is a block of ones + tifffile.imwrite(file, i * data) + + settings = Settings.init() + catalog = in_memory(writable_storage=tmpdir) + with Context.from_app(build_app(catalog)) as context: + await register_tiff_sequence( + catalog, + "image", + files, + settings, + ) + client = from_context(context) + # We are being a bit clever here. + # Each image in this image series has pixels with a constant value, and + # that value matches the image's position in the sequence enumerated by + # `ordering`. We pick out one pixel and check that its value matches + # the corresponding value in `ordering`. + actual = list(client["image"][:, 0, 0]) + assert actual == ordering + + +@pytest.mark.asyncio +async def test_image_file_with_sidecar_metadata_file(tmpdir): + "Create one Node from two different types of files." + MIMETYPE = "multipart/related;type=application/x-tiff-with-yaml" + image_filepath = Path(tmpdir, "image.tif") + tifffile.imwrite(image_filepath, data) + metadata_filepath = Path(tmpdir, "metadata.yml") + metadata = {"test_key": 3.0} + with open(metadata_filepath, "w") as file: + yaml.dump(metadata, file) + + def read_tiff_with_yaml_metadata(image_uri, metadata_uri, metadata=None, **kwargs): + with open(path_from_uri(metadata_uri)) as file: + metadata = yaml.safe_load(file) + return TiffAdapter(image_uri, metadata=metadata, **kwargs) + + catalog = in_memory( + writable_storage=tmpdir, + adapters_by_mimetype={MIMETYPE: read_tiff_with_yaml_metadata}, + ) + with Context.from_app(build_app(catalog)) as context: + adapter = read_tiff_with_yaml_metadata( + ensure_uri(image_filepath), ensure_uri(metadata_filepath) + ) + await create_node_safe( + catalog, + key="image", + structure_family=adapter.structure_family, + metadata=dict(adapter.metadata()), + specs=adapter.specs, + data_sources=[ + DataSource( + mimetype=MIMETYPE, + structure=dataclasses.asdict(adapter.structure()), + parameters={}, + management=Management.external, + assets=[ + Asset( + data_uri=ensure_uri(metadata_filepath), + is_directory=False, + parameter="metadata_uri", + ), + Asset( + data_uri=ensure_uri(image_filepath), + is_directory=False, + parameter="image_uri", + ), + ], + ) + ], + ) + client = from_context(context) + assert numpy.array_equal(data, client["image"][:]) + assert client["image"].metadata["test_key"] == 3.0 + + +@pytest.mark.asyncio +async def test_hdf5_virtual_datasets(tmpdir): + # A virtual database comprises one master file and N data files. The master + # file must be handed to the Adapter for opening. The data files are not + # handled directly by the Adapter but they still ought to be tracked as + # Assets for purposes of data movement, accounting for data size, etc. + # This is why they are Assets with parameter=NULL/None, Assets not used + # directly by the Adapter. + + # One could do one-dataset-per-directory. But like TIFF series in practice + # they are often mixed, so we address that general case and track them at + # the per-file level. + + # Contrast this to Zarr, where the files involves are always bundled by + # directory. We track Zarr at the directory level. + + layout = h5py.VirtualLayout(shape=(4, 100), dtype="i4") + + data_filepaths = [] + for n in range(1, 5): + filepath = Path(tmpdir, f"{n}.h5") + data_filepaths.append(filepath) + vsource = h5py.VirtualSource(filepath, "data", shape=(100,)) + layout[n - 1] = vsource + + # Add virtual dataset to output file + filepath = Path(tmpdir, "VDS.h5") + with h5py.File(filepath, "w", libver="latest") as file: + file.create_virtual_dataset("data", layout, fillvalue=-5) + + assets = [ + Asset( + data_uri=str(ensure_uri(str(fp))), + is_directory=False, + parameter=None, # an indirect dependency + ) + for fp in data_filepaths + ] + assets.append( + Asset( + data_uri=ensure_uri(filepath), + is_directory=False, + parameter="data_uri", + ) + ) + catalog = in_memory(writable_storage=tmpdir) + with Context.from_app(build_app(catalog)) as context: + adapter = HDF5Adapter.from_uri(ensure_uri(filepath)) + await create_node_safe( + catalog, + key="VDS", + structure_family=adapter.structure_family, + metadata=dict(adapter.metadata()), + specs=adapter.specs, + data_sources=[ + DataSource( + mimetype="application/x-hdf5", + structure=None, + parameters={}, + management=Management.external, + assets=assets, + ) + ], + ) + client = from_context(context) + client["VDS"]["data"][:] diff --git a/tiled/_tests/test_tiff.py b/tiled/_tests/test_tiff.py index 2fefc6914..7f8dce667 100644 --- a/tiled/_tests/test_tiff.py +++ b/tiled/_tests/test_tiff.py @@ -8,6 +8,7 @@ from ..adapters.tiff import TiffAdapter, TiffSequenceAdapter from ..catalog import in_memory from ..catalog.register import TIFF_SEQUENCE_EMPTY_NAME_ROOT, register +from ..catalog.utils import ensure_uri from ..client import Context, from_context from ..server.app import build_app @@ -18,18 +19,21 @@ def client(tmpdir_module): sequence_directory = Path(tmpdir_module, "sequence") sequence_directory.mkdir() + filepaths = [] for i in range(3): data = numpy.random.random((5, 7)) - tf.imwrite(sequence_directory / f"temp{i:05}.tif", data) + filepath = sequence_directory / f"temp{i:05}.tif" + tf.imwrite(filepath, data) + filepaths.append(filepath) color_data = numpy.random.randint(0, 255, COLOR_SHAPE, dtype="uint8") path = Path(tmpdir_module, "color.tif") tf.imwrite(path, color_data) tree = MapAdapter( { - "color": TiffAdapter(str(path)), - "sequence": TiffSequenceAdapter( - tf.TiffSequence(str(sequence_directory / "*.tif")) + "color": TiffAdapter(ensure_uri(path)), + "sequence": TiffSequenceAdapter.from_uris( + [ensure_uri(filepath) for filepath in filepaths] ), } ) diff --git a/tiled/adapters/awkward_buffers.py b/tiled/adapters/awkward_buffers.py index 06f7e2936..a12b92234 100644 --- a/tiled/adapters/awkward_buffers.py +++ b/tiled/adapters/awkward_buffers.py @@ -2,11 +2,11 @@ A directory containing awkward buffers, one file per form key. """ import collections.abc -from urllib import parse import awkward.forms from ..structures.core import StructureFamily +from ..utils import path_from_uri from .awkward import AwkwardAdapter @@ -37,23 +37,26 @@ class AwkwardBuffersAdapter(AwkwardAdapter): structure_family = StructureFamily.awkward @classmethod - def init_storage(cls, directory, structure): + def init_storage(cls, data_uri, structure): from ..server.schemas import Asset + directory = path_from_uri(data_uri) directory.mkdir(parents=True, exist_ok=True) - data_uri = parse.urlunparse(("file", "localhost", str(directory), "", "", None)) - return [Asset(data_uri=data_uri, is_directory=True)] + return [Asset(data_uri=data_uri, is_directory=True, parameter="data_uri")] @classmethod def from_directory( cls, - directory, + data_uri, structure, metadata=None, specs=None, access_policy=None, ): form = awkward.forms.from_dict(structure.form) + directory = path_from_uri(data_uri) + if not directory.is_dir(): + raise ValueError(f"Not a directory: {directory}") container = DirectoryContainer(directory, form) return cls( container, diff --git a/tiled/adapters/csv.py b/tiled/adapters/csv.py index 717a2858b..eda87889a 100644 --- a/tiled/adapters/csv.py +++ b/tiled/adapters/csv.py @@ -1,11 +1,12 @@ import dask.dataframe from ..server.object_cache import NO_CACHE, get_object_cache +from ..utils import path_from_uri from .dataframe import DataFrameAdapter def read_csv( - *args, + data_uri, structure=None, metadata=None, specs=None, @@ -25,7 +26,8 @@ def read_csv( >>> read_csv("myfiles.*.csv") >>> read_csv("s3://bucket/myfiles.*.csv") """ - ddf = dask.dataframe.read_csv(*args, **kwargs) + filepath = path_from_uri(data_uri) + ddf = dask.dataframe.read_csv(filepath, **kwargs) # If an instance has previously been created using the same parameters, # then we are here because the caller wants a *fresh* view on this data. # Therefore, we should clear any cached data. diff --git a/tiled/adapters/excel.py b/tiled/adapters/excel.py index 9ca17b7ee..c60fee471 100644 --- a/tiled/adapters/excel.py +++ b/tiled/adapters/excel.py @@ -18,10 +18,6 @@ def from_file(cls, file, **kwargs): Examples -------- - Given a file path - - >>> ExcelAdapter.from_file("path/to/excel_file.xlsx") - Given a file object >>> file = open("path/to/excel_file.xlsx") @@ -30,7 +26,8 @@ def from_file(cls, file, **kwargs): Given a pandas.ExcelFile object >>> import pandas - >>> ef = pandas.ExcelFile(file) + >>> filepath = "path/to/excel_file.xlsx" + >>> ef = pandas.ExcelFile(filepath) >>> ExcelAdapter.from_file(ef) """ if isinstance(file, pandas.ExcelFile): @@ -53,3 +50,21 @@ def from_file(cls, file, **kwargs): cache.discard_dask(ddf.__dask_keys__()) # dask tasks mapping[sheet_name] = DataFrameAdapter.from_dask_dataframe(ddf) return cls(mapping, **kwargs) + + @classmethod + def from_uri(cls, data_uri, **kwargs): + """ + Read the sheets in an Excel file. + + This maps the Excel file, which may contain one of more spreadsheets, + onto a tree of tabular structures. + + Examples + -------- + + Given a file path + + >>> ExcelAdapter.from_file("path/to/excel_file.xlsx") + """ + file = pandas.ExcelFile(data_uri) + return cls.from_file(file) diff --git a/tiled/adapters/hdf5.py b/tiled/adapters/hdf5.py index c767cbe5b..8aa9d70b2 100644 --- a/tiled/adapters/hdf5.py +++ b/tiled/adapters/hdf5.py @@ -8,7 +8,7 @@ from ..adapters.utils import IndexersMixin from ..iterviews import ItemsView, KeysView, ValuesView from ..structures.core import StructureFamily -from ..utils import node_repr +from ..utils import node_repr, path_from_uri from .array import ArrayAdapter SWMR_DEFAULT = bool(int(os.getenv("TILED_HDF5_SWMR_DEFAULT", "0"))) @@ -31,7 +31,7 @@ class HDF5Adapter(collections.abc.Mapping, IndexersMixin): From the root node of a file given a filepath >>> import h5py - >>> HDF5Adapter.from_file("path/to/file.h5") + >>> HDF5Adapter.from_uri("file://localhost/path/to/file.h5") From the root node of a file given an h5py.File object @@ -70,10 +70,24 @@ def from_file( specs=None, access_policy=None, ): - if not isinstance(file, h5py.File): - file = h5py.File(file, "r", swmr=swmr, libver=libver) return cls(file, metadata=metadata, specs=specs, access_policy=access_policy) + @classmethod + def from_uri( + cls, + data_uri, + *, + structure=None, + metadata=None, + swmr=SWMR_DEFAULT, + libver="latest", + specs=None, + access_policy=None, + ): + filepath = path_from_uri(data_uri) + file = h5py.File(filepath, "r", swmr=swmr, libver=libver) + return cls.from_file(file) + def __repr__(self): return node_repr(self, list(self)) diff --git a/tiled/adapters/parquet.py b/tiled/adapters/parquet.py index 3171a7ae7..9c6903bff 100644 --- a/tiled/adapters/parquet.py +++ b/tiled/adapters/parquet.py @@ -1,9 +1,9 @@ from pathlib import Path -from urllib import parse import dask.dataframe from ..structures.core import StructureFamily +from ..utils import path_from_uri from .dataframe import DataFrameAdapter @@ -12,13 +12,14 @@ class ParquetDatasetAdapter: def __init__( self, - *partition_paths, + data_uris, structure, metadata=None, specs=None, access_policy=None, ): - self.partition_paths = sorted(partition_paths) + # TODO Store data_uris instead and generalize to non-file schemes. + self._partition_paths = [path_from_uri(uri) for uri in data_uris] self._metadata = metadata or {} self._structure = structure self.specs = list(specs or []) @@ -30,7 +31,7 @@ def metadata(self): @property def dataframe_adapter(self): partitions = [] - for path in self.partition_paths: + for path in self._partition_paths: if not Path(path).exists(): partition = None else: @@ -39,28 +40,30 @@ def dataframe_adapter(self): return DataFrameAdapter(partitions, self._structure) @classmethod - def init_storage(cls, directory, structure): + def init_storage(cls, data_uri, structure): from ..server.schemas import Asset + directory = path_from_uri(data_uri) directory.mkdir(parents=True, exist_ok=True) - data_uri = parse.urlunparse(("file", "localhost", str(directory), "", "", None)) assets = [ Asset( data_uri=f"{data_uri}/partition-{i}.parquet", is_directory=False, + parameter="data_uris", + num=i, ) for i in range(structure.npartitions) ] return assets def write_partition(self, data, partition): - uri = self.partition_paths[partition] + uri = self._partition_paths[partition] data.to_parquet(uri) def write(self, data): if self.structure().npartitions != 1: raise NotImplementedError - uri = self.partition_paths[0] + uri = self._partition_paths[0] data.to_parquet(uri) def read(self, *args, **kwargs): diff --git a/tiled/adapters/sparse_blocks_parquet.py b/tiled/adapters/sparse_blocks_parquet.py index f7fc121fc..da479ce73 100644 --- a/tiled/adapters/sparse_blocks_parquet.py +++ b/tiled/adapters/sparse_blocks_parquet.py @@ -1,5 +1,4 @@ import itertools -from urllib import parse import numpy import pandas @@ -7,12 +6,13 @@ from ..adapters.array import slice_and_shape_from_block_and_chunks from ..structures.core import StructureFamily +from ..utils import path_from_uri def load_block(uri): # TODO This can be done without pandas. # Better to use a plain I/O library. - df = pandas.read_parquet(uri) + df = pandas.read_parquet(path_from_uri(uri)) coords = df[df.columns[:-1]].values.T data = df["data"].values return coords, data @@ -23,7 +23,7 @@ class SparseBlocksParquetAdapter: def __init__( self, - *block_uris, + data_uris, structure, metadata=None, specs=None, @@ -31,7 +31,7 @@ def __init__( ): num_blocks = (range(len(n)) for n in structure.chunks) self.blocks = {} - for block, uri in zip(itertools.product(*num_blocks), sorted(block_uris)): + for block, uri in zip(itertools.product(*num_blocks), data_uris): self.blocks[block] = uri self._structure = structure self._metadata = metadata or {} @@ -41,25 +41,23 @@ def __init__( @classmethod def init_storage( cls, - directory, + data_uri, structure, ): from ..server.schemas import Asset + directory = path_from_uri(data_uri) directory.mkdir(parents=True, exist_ok=True) num_blocks = (range(len(n)) for n in structure.chunks) - block_uris = [] - for block in itertools.product(*num_blocks): - filepath = directory / f"block-{'.'.join(map(str, block))}.parquet" - uri = parse.urlunparse(("file", "localhost", str(filepath), "", "", None)) - block_uris.append(uri) assets = [ Asset( - data_uri=uri, + data_uri=f"{data_uri}/block-{'.'.join(map(str, block))}.parquet", is_directory=False, + parameter="data_uris", + num=i, ) - for uri in block_uris + for i, block in enumerate(itertools.product(*num_blocks)) ] return assets @@ -68,13 +66,13 @@ def metadata(self): def write_block(self, data, block): uri = self.blocks[block] - data.to_parquet(uri) + data.to_parquet(path_from_uri(uri)) def write(self, data): if len(self.blocks) > 1: raise NotImplementedError uri = self.blocks[(0,) * len(self._structure.shape)] - data.to_parquet(uri) + data.to_parquet(path_from_uri(uri)) def read(self, slice=...): all_coords = [] diff --git a/tiled/adapters/tiff.py b/tiled/adapters/tiff.py index f78e8a2da..794371701 100644 --- a/tiled/adapters/tiff.py +++ b/tiled/adapters/tiff.py @@ -6,6 +6,7 @@ from ..server.object_cache import with_object_cache from ..structures.array import ArrayStructure, BuiltinDtype from ..structures.core import StructureFamily +from ..utils import path_from_uri class TiffAdapter: @@ -22,15 +23,18 @@ class TiffAdapter: def __init__( self, - path, + data_uri, *, structure=None, metadata=None, specs=None, access_policy=None, ): - self._file = tifffile.TiffFile(path) - self._cache_key = (type(self).__module__, type(self).__qualname__, path) + if not isinstance(data_uri, str): + raise Exception + filepath = path_from_uri(data_uri) + self._file = tifffile.TiffFile(filepath) + self._cache_key = (type(self).__module__, type(self).__qualname__, filepath) self.specs = specs or [] self._provided_metadata = metadata or {} self.access_policy = access_policy @@ -83,15 +87,16 @@ class TiffSequenceAdapter: structure_family = "array" @classmethod - def from_files( + def from_uris( cls, - *files, + data_uris, structure=None, metadata=None, specs=None, access_policy=None, ): - seq = tifffile.TiffSequence(sorted(files)) + filepaths = [path_from_uri(data_uri) for data_uri in data_uris] + seq = tifffile.TiffSequence(filepaths) return cls( seq, structure=structure, diff --git a/tiled/adapters/zarr.py b/tiled/adapters/zarr.py index 73ba58046..3ecd436bb 100644 --- a/tiled/adapters/zarr.py +++ b/tiled/adapters/zarr.py @@ -1,7 +1,6 @@ import builtins import collections.abc import os -from urllib import parse import zarr.core import zarr.hierarchy @@ -9,32 +8,36 @@ from ..adapters.utils import IndexersMixin from ..iterviews import ItemsView, KeysView, ValuesView -from ..structures.array import ArrayStructure from ..structures.core import StructureFamily -from ..utils import node_repr +from ..utils import node_repr, path_from_uri from .array import ArrayAdapter, slice_and_shape_from_block_and_chunks INLINED_DEPTH = int(os.getenv("TILED_HDF5_INLINED_CONTENTS_MAX_DEPTH", "7")) -def read_zarr(filepath, **kwargs): - file = zarr.open(filepath) - if isinstance(file, zarr.hierarchy.Group): - adapter = ZarrGroupAdapter.from_directory(file, **kwargs) +def read_zarr(data_uri, structure=None, **kwargs): + filepath = path_from_uri(data_uri) + zarr_obj = zarr.open(filepath) # Group or Array + if isinstance(zarr_obj, zarr.hierarchy.Group): + adapter = ZarrGroupAdapter(zarr_obj, **kwargs) else: - adapter = ZarrArrayAdapter.from_directory(file, **kwargs) + if structure is None: + adapter = ZarrArrayAdapter.from_array(zarr_obj, **kwargs) + else: + adapter = ZarrArrayAdapter(zarr_obj, structure=structure, **kwargs) return adapter class ZarrArrayAdapter(ArrayAdapter): @classmethod - def init_storage(cls, directory, structure): + def init_storage(cls, data_uri, structure): from ..server.schemas import Asset # Zarr requires evenly-sized chunks within each dimension. # Use the first chunk along each dimension. zarr_chunks = tuple(dim[0] for dim in structure.chunks) shape = tuple(dim[0] * len(dim) for dim in structure.chunks) + directory = path_from_uri(data_uri) directory.mkdir(parents=True, exist_ok=True) storage = zarr.storage.DirectoryStore(str(directory)) zarr.storage.init_array( @@ -43,29 +46,14 @@ def init_storage(cls, directory, structure): chunks=zarr_chunks, dtype=structure.data_type.to_numpy_dtype(), ) - data_uri = parse.urlunparse(("file", "localhost", str(directory), "", "", None)) return [ Asset( data_uri=data_uri, is_directory=True, + parameter="data_uri", ) ] - @classmethod - def from_directory( - cls, - directory, - structure=None, - **kwargs, - ): - if not isinstance(directory, zarr.core.Array): - array = zarr.open_array(str(directory), "r+") - else: - array = directory - if structure is None: - structure = ArrayStructure.from_array(array) - return cls(array, structure, **kwargs) - def _stencil(self): "Trims overflow because Zarr always has equal-sized chunks." return tuple(builtins.slice(0, dim) for dim in self.structure().shape) @@ -111,12 +99,6 @@ def __init__( self._provided_metadata = metadata or {} super().__init__() - @classmethod - def from_directory(cls, directory, **kwargs): - if not isinstance(directory, zarr.hierarchy.Group): - directory = zarr.open_group(directory, "r") - return cls(directory, **kwargs) - def __repr__(self): return node_repr(self, list(self)) diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index 81b16996d..60421d18a 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -11,7 +11,6 @@ from urllib.parse import quote_plus, urlparse import anyio -import httpx from fastapi import HTTPException from sqlalchemy import delete, event, func, not_, or_, select, text, type_coerce, update from sqlalchemy.exc import IntegrityError @@ -40,41 +39,44 @@ UnsupportedQueryType, ensure_awaitable, import_object, + path_from_uri, safe_json_dump, ) from . import orm from .core import check_catalog_database, initialize_database from .explain import ExplainAsyncSession from .mimetypes import ( + AWKWARD_BUFFERS_MIMETYPE, DEFAULT_ADAPTERS_BY_MIMETYPE, PARQUET_MIMETYPE, SPARSE_BLOCKS_PARQUET_MIMETYPE, ZARR_MIMETYPE, - ZIP_MIMETYPE, ) -from .utils import SCHEME_PATTERN, ensure_uri, safe_path +from .utils import SCHEME_PATTERN, ensure_uri DEFAULT_ECHO = bool(int(os.getenv("TILED_ECHO_SQL", "0") or "0")) INDEX_PATTERN = re.compile(r"^[A-Za-z0-9_-]+$") +# When data is uploaded, how is it saved? +# TODO: Make this configurable at Catalog construction time. DEFAULT_CREATION_MIMETYPE = { StructureFamily.array: ZARR_MIMETYPE, - StructureFamily.awkward: ZIP_MIMETYPE, + StructureFamily.awkward: AWKWARD_BUFFERS_MIMETYPE, StructureFamily.table: PARQUET_MIMETYPE, StructureFamily.sparse: SPARSE_BLOCKS_PARQUET_MIMETYPE, } -CREATE_ADAPTER_BY_MIMETYPE = OneShotCachedMap( +DEFAULT_INIT_STORAGE = OneShotCachedMap( { - ZARR_MIMETYPE: lambda: importlib.import_module( + StructureFamily.array: lambda: importlib.import_module( "...adapters.zarr", __name__ ).ZarrArrayAdapter.init_storage, - ZIP_MIMETYPE: lambda: importlib.import_module( + StructureFamily.awkward: lambda: importlib.import_module( "...adapters.awkward_buffers", __name__ ).AwkwardBuffersAdapter.init_storage, - PARQUET_MIMETYPE: lambda: importlib.import_module( + StructureFamily.table: lambda: importlib.import_module( "...adapters.parquet", __name__ ).ParquetDatasetAdapter.init_storage, - SPARSE_BLOCKS_PARQUET_MIMETYPE: lambda: importlib.import_module( + StructureFamily.sparse: lambda: importlib.import_module( "...adapters.sparse_blocks_parquet", __name__ ).SparseBlocksParquetAdapter.init_storage, } @@ -117,7 +119,7 @@ def __init__( raise ValueError("readable_storage should be a list of URIs or paths") if writable_storage: writable_storage = ensure_uri(str(writable_storage)) - if not writable_storage.scheme == "file": + if not urlparse(writable_storage).scheme == "file": raise NotImplementedError( "Only file://... writable storage is currently supported." ) @@ -390,37 +392,43 @@ async def get_adapter(self): raise RuntimeError( f"Server configuration has no adapter for mimetype {data_source.mimetype!r}" ) - data_uris = [httpx.URL(asset.data_uri) for asset in data_source.assets] - for data_uri in data_uris: - if data_uri.scheme == "file": + parameters = collections.defaultdict(list) + for asset in data_source.assets: + if asset.parameter is None: + continue + scheme = urlparse(asset.data_uri).scheme + if scheme != "file": + raise NotImplementedError( + f"Only 'file://...' scheme URLs are currently supported, not {asset.data_uri}" + ) + if scheme == "file": # Protect against misbehaving clients reading from unintended # parts of the filesystem. + asset_path = path_from_uri(asset.data_uri) for readable_storage in self.context.readable_storage: if Path( os.path.commonpath( - [safe_path(readable_storage), safe_path(data_uri)] + [path_from_uri(readable_storage), asset_path] ) - ) == safe_path(readable_storage): + ) == path_from_uri(readable_storage): break else: raise RuntimeError( - f"Refusing to serve {data_uri} because it is outside " + f"Refusing to serve {asset.data_uri} because it is outside " "the readable storage area for this server." ) - paths = [] - for data_uri in data_uris: - if data_uri.scheme != "file": - raise NotImplementedError( - f"Only 'file://...' scheme URLs are currently supported, not {data_uri!r}" - ) - paths.append(safe_path(data_uri)) - adapter_kwargs = dict(data_source.parameters) + if asset.num is None: + parameters[asset.parameter] = asset.data_uri + else: + parameters[asset.parameter].append(asset.data_uri) + adapter_kwargs = dict(parameters) + adapter_kwargs.update(data_source.parameters) adapter_kwargs["specs"] = self.node.specs adapter_kwargs["metadata"] = self.node.metadata_ adapter_kwargs["structure"] = data_source.structure adapter_kwargs["access_policy"] = self.access_policy adapter = await anyio.to_thread.run_sync( - partial(adapter_factory, *paths, **adapter_kwargs) + partial(adapter_factory, **adapter_kwargs) ) for query in self.queries: adapter = adapter.search(query) @@ -561,9 +569,9 @@ async def create_node( data_uri = str(self.context.writable_storage) + "".join( f"/{quote_plus(segment)}" for segment in (self.segments + [key]) ) - init_storage = CREATE_ADAPTER_BY_MIMETYPE[data_source.mimetype] + init_storage = DEFAULT_INIT_STORAGE[structure_family] assets = await ensure_awaitable( - init_storage, safe_path(data_uri), data_source.structure + init_storage, data_uri, data_source.structure ) data_source.assets.extend(assets) data_source_orm = orm.DataSource( @@ -581,7 +589,12 @@ async def create_node( data_uri=asset.data_uri, is_directory=asset.is_directory, ) - data_source_orm.assets.append(asset_orm) + assoc_orm = orm.DataSourceAssetAssociation( + asset=asset_orm, + parameter=asset.parameter, + num=asset.num, + ) + data_source_orm.asset_associations.append(assoc_orm) db.add(node) await db.commit() await db.refresh(node) @@ -876,7 +889,7 @@ async def write_partition(self, *args, **kwargs): def delete_asset(data_uri, is_directory): url = urlparse(data_uri) if url.scheme == "file": - path = safe_path(data_uri) + path = path_from_uri(data_uri) if is_directory: shutil.rmtree(path) else: @@ -1059,12 +1072,14 @@ def structure_family(query, tree): def in_memory( + *, metadata=None, specs=None, access_policy=None, writable_storage=None, readable_storage=None, echo=DEFAULT_ECHO, + adapters_by_mimetype=None, ): uri = "sqlite+aiosqlite:///:memory:" return from_uri( @@ -1075,6 +1090,7 @@ def in_memory( writable_storage=writable_storage, readable_storage=readable_storage, echo=echo, + adapters_by_mimetype=adapters_by_mimetype, ) diff --git a/tiled/catalog/core.py b/tiled/catalog/core.py index 3cf202510..992c8a1da 100644 --- a/tiled/catalog/core.py +++ b/tiled/catalog/core.py @@ -3,12 +3,15 @@ from ..alembic_utils import DatabaseUpgradeNeeded, UninitializedDatabase, check_database from .base import Base -# This is the alembic revision ID of the database revision -# required by this version of Tiled. -REQUIRED_REVISION = "3db11ff95b6c" - # This is list of all valid revisions (from current to oldest). -ALL_REVISIONS = ["3db11ff95b6c", "0b033e7fbe30", "83889e049ddc", "6825c778aa3c"] +ALL_REVISIONS = [ + "a66028395cab", + "3db11ff95b6c", + "0b033e7fbe30", + "83889e049ddc", + "6825c778aa3c", +] +REQUIRED_REVISION = ALL_REVISIONS[0] async def initialize_database(engine): diff --git a/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py b/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py new file mode 100644 index 000000000..f2fbae9cf --- /dev/null +++ b/tiled/catalog/migrations/versions/a66028395cab_enrich_datasource_asset_association.py @@ -0,0 +1,230 @@ +"""Enrich DataSource-Asset association. + +Revision ID: a66028395cab +Revises: 3db11ff95b6c +Create Date: 2024-01-21 15:17:20.571763 + +""" +import sqlalchemy as sa +from alembic import op + +from tiled.catalog.orm import ( + DataSourceAssetAssociation, + JSONVariant, + unique_parameter_num_null_check, +) + +# revision identifiers, used by Alembic. +revision = "a66028395cab" +down_revision = "3db11ff95b6c" +branch_labels = None +depends_on = None + + +def upgrade(): + connection = op.get_bind() + data_sources = sa.Table( + "data_sources", + sa.MetaData(), + sa.Column("id", sa.Integer), + sa.Column("node_id", sa.Integer), + sa.Column("mimetype", sa.Unicode), + sa.Column("structure", JSONVariant), + ) + data_source_asset_association = sa.Table( + DataSourceAssetAssociation.__tablename__, + sa.MetaData(), + sa.Column("asset_id", sa.Integer), + sa.Column("data_source_id", sa.Integer), + sa.Column("parameter", sa.Unicode(255)), + sa.Column("num", sa.Integer), + ) + assets = sa.Table( + "assets", + sa.MetaData(), + sa.Column("id", sa.Integer), + sa.Column("data_uri", sa.Unicode), + ) + + # Rename some MIME types. + + # While Awkward data is typically _transmitted_ in a ZIP archive, + # it is stored as directory of buffers, with no ZIP involved. + # Thus using 'application/zip' in the database was a mistake. + connection.execute( + data_sources.update() + .where(data_sources.c.mimetype == "application/zip") + .values(mimetype="application/x-awkward-buffers") + ) + # The format is standard parquet. We will use a MIME type + # parameter to let Tiled know to use the Adapter for sparse + # data, as opposed to the Adapter for tabular data. + connection.execute( + data_sources.update() + .where(data_sources.c.mimetype == "application/x-parquet-sparse") + .values(mimetype="application/x-parquet;structure=sparse") + ) + + # Add columns 'parameter' and 'num' to association table. + op.add_column( + DataSourceAssetAssociation.__tablename__, + sa.Column("parameter", sa.Unicode(255), nullable=True), + ) + op.add_column( + DataSourceAssetAssociation.__tablename__, + sa.Column("num", sa.Integer, nullable=True), + ) + + # First populate the columns to bring them into compliance with + # constraints. Then, apply constraints. + + results = connection.execute( + sa.select(data_sources.c.id) + .where( + sa.not_( + data_sources.c.mimetype.in_( + [ + "multipart/related;type=image/tiff", + "application/x-parquet", + "application/x-parquet;structure=sparse", + ] + ) + ) + ) + .select_from(data_sources) + .join( + data_source_asset_association, + data_sources.c.id == data_source_asset_association.c.data_source_id, + ) + .join( + assets, + data_source_asset_association.c.asset_id == assets.c.id, + ) + .distinct() + ).fetchall() + for (data_source_id,) in results: + connection.execute( + data_source_asset_association.update() + .where(data_source_asset_association.c.data_source_id == data_source_id) + .values(parameter="data_uri") + ) + results = connection.execute( + sa.select(data_sources.c.id) + .where( + data_sources.c.mimetype.in_( + [ + "multipart/related;type=image/tiff", + "application/x-parquet", + "application/x-parquet;structure=sparse", + ] + ) + ) + .select_from(data_sources) + .join( + data_source_asset_association, + data_sources.c.id == data_source_asset_association.c.data_source_id, + ) + .join( + assets, + data_source_asset_association.c.asset_id == assets.c.id, + ) + .distinct() + ).fetchall() + for (data_source_id,) in results: + connection.execute( + data_source_asset_association.update() + .where(data_source_asset_association.c.data_source_id == data_source_id) + .values(parameter="data_uris") # plural + ) + sorted_assoc = connection.execute( + sa.select(data_source_asset_association.c.data_source_id, assets.c.id) + .where(data_source_asset_association.c.data_source_id == data_source_id) + .order_by(assets.c.data_uri) + .select_from(data_sources) + .join( + data_source_asset_association, + data_sources.c.id == data_source_asset_association.c.data_source_id, + ) + .join( + assets, + data_source_asset_association.c.asset_id == assets.c.id, + ) + ).fetchall() + for num, (data_source_id, asset_id) in enumerate(sorted_assoc, start=1): + connection.execute( + data_source_asset_association.update() + .where(data_source_asset_association.c.data_source_id == data_source_id) + .where(data_source_asset_association.c.asset_id == asset_id) + .values(num=num) + ) + + # Create unique constraint and triggers. + if connection.engine.dialect.name == "sqlite": + # SQLite does not supported adding constraints to an existing table. + # We invoke its 'copy and move' functionality. + with op.batch_alter_table(DataSourceAssetAssociation.__tablename__) as batch_op: + # Gotcha: This does not take table_name because it is bound into batch_op. + batch_op.create_unique_constraint( + "parameter_num_unique_constraint", + [ + "data_source_id", + "parameter", + "num", + ], + ) + with op.get_context().autocommit_block(): + connection.execute( + sa.text( + """ + CREATE TRIGGER cannot_insert_num_null_if_num_int_exists + BEFORE INSERT ON data_source_asset_association + WHEN NEW.num IS NULL + BEGIN + SELECT RAISE(ABORT, 'Can only insert num=NULL if no other row exists for the same parameter') + WHERE EXISTS + ( + SELECT 1 + FROM data_source_asset_association + WHERE parameter = NEW.parameter + AND data_source_id = NEW.data_source_id + ); + END""" + ) + ) + connection.execute( + sa.text( + """ + CREATE TRIGGER cannot_insert_num_int_if_num_null_exists + BEFORE INSERT ON data_source_asset_association + WHEN NEW.num IS NOT NULL + BEGIN + SELECT RAISE(ABORT, 'Can only insert INTEGER num if no NULL row exists for the same parameter') + WHERE EXISTS + ( + SELECT 1 + FROM data_source_asset_association + WHERE parameter = NEW.parameter + AND num IS NULL + AND data_source_id = NEW.data_source_id + ); + END""" + ) + ) + else: + # PostgreSQL + op.create_unique_constraint( + "parameter_num_unique_constraint", + DataSourceAssetAssociation.__tablename__, + [ + "data_source_id", + "parameter", + "num", + ], + ) + unique_parameter_num_null_check(data_source_asset_association, connection) + + +def downgrade(): + # This _could_ be implemented but we will wait for a need since we are + # still in alpha releases. + raise NotImplementedError diff --git a/tiled/catalog/mimetypes.py b/tiled/catalog/mimetypes.py index 0d99a7660..d6f2405cc 100644 --- a/tiled/catalog/mimetypes.py +++ b/tiled/catalog/mimetypes.py @@ -7,9 +7,9 @@ # OneShotCachedMap is used to defer imports. We don't want to pay up front # for importing Readers that we will not actually use. PARQUET_MIMETYPE = "application/x-parquet" -SPARSE_BLOCKS_PARQUET_MIMETYPE = "application/x-parquet-sparse" # HACK! -ZIP_MIMETYPE = "application/zip" +SPARSE_BLOCKS_PARQUET_MIMETYPE = "application/x-parquet;structure=sparse" ZARR_MIMETYPE = "application/x-zarr" +AWKWARD_BUFFERS_MIMETYPE = "application/x-awkward-buffers" DEFAULT_ADAPTERS_BY_MIMETYPE = OneShotCachedMap( { "image/tiff": lambda: importlib.import_module( @@ -17,16 +17,16 @@ ).TiffAdapter, "multipart/related;type=image/tiff": lambda: importlib.import_module( "...adapters.tiff", __name__ - ).TiffSequenceAdapter.from_files, + ).TiffSequenceAdapter.from_uris, "text/csv": lambda: importlib.import_module( "...adapters.csv", __name__ ).read_csv, XLSX_MIME_TYPE: lambda: importlib.import_module( "...adapters.excel", __name__ - ).ExcelAdapter.from_file, + ).ExcelAdapter.from_uri, "application/x-hdf5": lambda: importlib.import_module( "...adapters.hdf5", __name__ - ).HDF5Adapter.from_file, + ).HDF5Adapter.from_uri, "application/x-netcdf": lambda: importlib.import_module( "...adapters.netcdf", __name__ ).read_netcdf, @@ -39,7 +39,7 @@ ZARR_MIMETYPE: lambda: importlib.import_module( "...adapters.zarr", __name__ ).read_zarr, - ZIP_MIMETYPE: lambda: importlib.import_module( + AWKWARD_BUFFERS_MIMETYPE: lambda: importlib.import_module( "...adapters.awkward_buffers", __name__ ).AwkwardBuffersAdapter.from_directory, } diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index 6822dedc7..bda9d0309 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -1,3 +1,5 @@ +from typing import List + from sqlalchemy import ( JSON, Boolean, @@ -7,11 +9,12 @@ ForeignKey, Index, Integer, - Table, Unicode, + event, + text, ) from sqlalchemy.dialects.postgresql import JSONB -from sqlalchemy.orm import relationship +from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.schema import UniqueConstraint from sqlalchemy.sql import func @@ -97,20 +100,156 @@ class Node(Timestamped, Base): ) -data_source_asset_association_table = Table( - "data_source_asset_association", - Base.metadata, - Column( - "data_source_id", - Integer, +class DataSourceAssetAssociation(Base): + """ + This describes which Assets are used by which DataSources, and how. + + The 'parameter' describes which argument to pass the asset into when + constructing the Adapter. If 'parameter' is NULL then the asset is an + indirect dependency, such as a HDF5 data file backing an HDF5 'master' + file. + + If 'num' is NULL, the asset is passed as a scalar value, and there must be + only one for the given 'parameter'. If 'num' is not NULL, all the assets + sharing the same 'parameter' (there may be one or more) will be passed in + as a list, ordered in ascending order of 'num'. + """ + + __tablename__ = "data_source_asset_association" + + data_source_id: Mapped[int] = mapped_column( ForeignKey("data_sources.id", ondelete="CASCADE"), - ), - Column( - "asset_id", - Integer, + primary_key=True, + ) + asset_id: Mapped[int] = mapped_column( ForeignKey("assets.id", ondelete="CASCADE"), - ), -) + primary_key=True, + ) + parameter = Column(Unicode(255), nullable=True) + num = Column(Integer, nullable=True) + + data_source: Mapped["DataSource"] = relationship( + back_populates="asset_associations" + ) + asset: Mapped["Asset"] = relationship( + back_populates="data_source_associations", lazy="selectin" + ) + + __table_args__ = ( + UniqueConstraint( + "data_source_id", + "parameter", + "num", + name="parameter_num_unique_constraint", + ), + # Below, in unique_parameter_num_null_check, additional constraints + # are applied, via triggers. + ) + + +@event.listens_for(DataSourceAssetAssociation.__table__, "after_create") +def unique_parameter_num_null_check(target, connection, **kw): + # Ensure that we cannot mix NULL and INTEGER values of num for + # a given data_source_id and parameter, and that there cannot be multiple + # instances of NULL. + if connection.engine.dialect.name == "sqlite": + connection.execute( + text( + """ +CREATE TRIGGER cannot_insert_num_null_if_num_exists +BEFORE INSERT ON data_source_asset_association +WHEN NEW.num IS NULL +BEGIN + SELECT RAISE(ABORT, 'Can only insert num=NULL if no other row exists for the same parameter') + WHERE EXISTS + ( + SELECT 1 + FROM data_source_asset_association + WHERE parameter = NEW.parameter + AND data_source_id = NEW.data_source_id + ); +END""" + ) + ) + connection.execute( + text( + """ +CREATE TRIGGER cannot_insert_num_int_if_num_null_exists +BEFORE INSERT ON data_source_asset_association +WHEN NEW.num IS NOT NULL +BEGIN + SELECT RAISE(ABORT, 'Can only insert INTEGER num if no NULL row exists for the same parameter') + WHERE EXISTS + ( + SELECT 1 + FROM data_source_asset_association + WHERE parameter = NEW.parameter + AND num IS NULL + AND data_source_id = NEW.data_source_id + ); +END""" + ) + ) + elif connection.engine.dialect.name == "postgresql": + connection.execute( + text( + """ +CREATE OR REPLACE FUNCTION raise_if_parameter_exists() +RETURNS TRIGGER AS $$ +BEGIN + IF EXISTS ( + SELECT 1 + FROM data_source_asset_association + WHERE parameter = NEW.parameter + AND data_source_id = NEW.data_source_id + ) THEN + RAISE EXCEPTION 'Can only insert num=NULL if no other row exists for the same parameter'; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql;""" + ) + ) + connection.execute( + text( + """ +CREATE TRIGGER cannot_insert_num_null_if_num_exists +BEFORE INSERT ON data_source_asset_association +FOR EACH ROW +WHEN (NEW.num IS NULL) +EXECUTE FUNCTION raise_if_parameter_exists();""" + ) + ) + connection.execute( + text( + """ +CREATE OR REPLACE FUNCTION raise_if_null_parameter_exists() +RETURNS TRIGGER AS $$ +BEGIN + IF EXISTS ( + SELECT 1 + FROM data_source_asset_association + WHERE parameter = NEW.parameter + AND data_source_id = NEW.data_source_id + AND num IS NULL + ) THEN + RAISE EXCEPTION 'Can only insert INTEGER num if no NULL row exists for the same parameter'; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql;""" + ) + ) + connection.execute( + text( + """ +CREATE TRIGGER cannot_insert_num_int_if_num_null_exists +BEFORE INSERT ON data_source_asset_association +FOR EACH ROW +WHEN (NEW.num IS NOT NULL) +EXECUTE FUNCTION raise_if_null_parameter_exists();""" + ) + ) class DataSource(Timestamped, Base): @@ -129,7 +268,7 @@ class DataSource(Timestamped, Base): __tablename__ = "data_sources" __mapper_args__ = {"eager_defaults": True} - id = Column(Integer, primary_key=True, index=True, autoincrement=True) + id: Mapped[int] = mapped_column(primary_key=True, index=True, autoincrement=True) node_id = Column( Integer, ForeignKey("nodes.id", ondelete="CASCADE"), nullable=False ) @@ -142,12 +281,19 @@ class DataSource(Timestamped, Base): # This relates to the mutability of the data. management = Column(Enum(Management), nullable=False) - assets = relationship( - "Asset", - secondary=data_source_asset_association_table, + # many-to-many relationship to DataSource, bypassing the `Association` class + assets: Mapped[List["Asset"]] = relationship( + secondary="data_source_asset_association", back_populates="data_sources", cascade="all, delete", lazy="selectin", + viewonly=True, + ) + # association between Asset -> Association -> DataSource + asset_associations: Mapped[List["DataSourceAssetAssociation"]] = relationship( + back_populates="data_source", + lazy="selectin", + order_by=[DataSourceAssetAssociation.parameter, DataSourceAssetAssociation.num], ) @@ -159,7 +305,7 @@ class Asset(Timestamped, Base): __tablename__ = "assets" __mapper_args__ = {"eager_defaults": True} - id = Column(Integer, primary_key=True, index=True, autoincrement=True) + id: Mapped[int] = mapped_column(primary_key=True, index=True, autoincrement=True) data_uri = Column(Unicode(1023), index=True, unique=True) is_directory = Column(Boolean, nullable=False) @@ -167,11 +313,15 @@ class Asset(Timestamped, Base): hash_content = Column(Unicode(1023), nullable=True) size = Column(Integer, nullable=True) - data_sources = relationship( - "DataSource", - secondary=data_source_asset_association_table, + # # many-to-many relationship to Asset, bypassing the `Association` class + data_sources: Mapped[List["DataSource"]] = relationship( + secondary="data_source_asset_association", back_populates="assets", - passive_deletes=True, + viewonly=True, + ) + # association between DataSource -> Association -> Asset + data_source_associations: Mapped[List["DataSourceAssetAssociation"]] = relationship( + back_populates="asset", ) diff --git a/tiled/catalog/register.py b/tiled/catalog/register.py index 3baacbad8..1031705cc 100644 --- a/tiled/catalog/register.py +++ b/tiled/catalog/register.py @@ -291,7 +291,7 @@ async def register_single_item( adapter_factory = settings.adapters_by_mimetype[mimetype] logger.info(" Resolved mimetype '%s' with adapter for '%s'", mimetype, item) try: - adapter = await anyio.to_thread.run_sync(adapter_factory, item) + adapter = await anyio.to_thread.run_sync(adapter_factory, ensure_uri(item)) except Exception: logger.exception(" SKIPPED: Error constructing adapter for '%s':", item) return @@ -310,8 +310,9 @@ async def register_single_item( management=Management.external, assets=[ Asset( - data_uri=str(ensure_uri(str(item.absolute()))), + data_uri=ensure_uri(item), is_directory=is_directory, + parameter="data_uri", ) ], ) @@ -325,7 +326,7 @@ async def register_single_item( TIFF_SEQUENCE_EMPTY_NAME_ROOT = "_unnamed" -async def tiff_sequence( +async def group_tiff_sequences( catalog, path, files, @@ -355,41 +356,49 @@ async def tiff_sequence( sequences[sequence_name].append(file) continue unhandled_files.append(file) - mimetype = "multipart/related;type=image/tiff" - for name, sequence in sorted(sequences.items()): - logger.info(" Grouped %d TIFFs into a sequence '%s'", len(sequence), name) - adapter_class = settings.adapters_by_mimetype[mimetype] - key = settings.key_from_filename(name) - try: - adapter = adapter_class(*sequence) - except Exception: - logger.exception(" SKIPPED: Error constructing adapter for '%s'", name) - return - await create_node_safe( - catalog, - key=key, - structure_family=adapter.structure_family, - metadata=dict(adapter.metadata()), - specs=adapter.specs, - data_sources=[ - DataSource( - mimetype=mimetype, - structure=dict_or_none(adapter.structure()), - parameters={}, - management=Management.external, - assets=[ - Asset( - data_uri=str(ensure_uri(str(item.absolute()))), - is_directory=False, - ) - for item in sorted(sequence) - ], - ) - ], - ) + for name, sequence in sequences.items(): + await register_tiff_sequence(catalog, name, sorted(sequence), settings) return unhandled_files, unhandled_directories +TIFF_SEQ_MIMETYPE = "multipart/related;type=image/tiff" + + +async def register_tiff_sequence(catalog, name, sequence, settings): + logger.info(" Grouped %d TIFFs into a sequence '%s'", len(sequence), name) + adapter_class = settings.adapters_by_mimetype[TIFF_SEQ_MIMETYPE] + key = settings.key_from_filename(name) + try: + adapter = adapter_class([ensure_uri(filepath) for filepath in sequence]) + except Exception: + logger.exception(" SKIPPED: Error constructing adapter for '%s'", name) + return + await create_node_safe( + catalog, + key=key, + structure_family=adapter.structure_family, + metadata=dict(adapter.metadata()), + specs=adapter.specs, + data_sources=[ + DataSource( + mimetype=TIFF_SEQ_MIMETYPE, + structure=dict_or_none(adapter.structure()), + parameters={}, + management=Management.external, + assets=[ + Asset( + data_uri=ensure_uri(item), + is_directory=False, + parameter="data_uris", + num=i, + ) + for i, item in enumerate(sequence) + ], + ) + ], + ) + + async def skip_all( catalog, path, @@ -407,7 +416,7 @@ async def skip_all( return [], directories -DEFAULT_WALKERS = [tiff_sequence, one_node_per_item] +DEFAULT_WALKERS = [group_tiff_sequences, one_node_per_item] async def watch( diff --git a/tiled/catalog/utils.py b/tiled/catalog/utils.py index 1e52883cb..0be02d1d8 100644 --- a/tiled/catalog/utils.py +++ b/tiled/catalog/utils.py @@ -1,43 +1,24 @@ import re -import sys from pathlib import Path -from urllib import parse - -import httpx +from urllib.parse import urlparse, urlunparse SCHEME_PATTERN = re.compile(r"^[a-z0-9+]+:\/\/.*$") -def safe_path(uri): - """ - Acceess the path of a URI and return it as a Path object. - - Ideally we could just do uri.path, but Windows paths confuse - HTTP URI parsers because of the drive (e.g. C:) and return - something like /C:/x/y/z with an extraneous leading slash. - """ - raw_path = httpx.URL(uri).path - if sys.platform == "win32" and raw_path[0] == "/": - path = raw_path[1:] - else: - path = raw_path - return Path(path) - - def ensure_uri(uri_or_path): "Accept a URI or file path (Windows- or POSIX-style) and return a URI." if not SCHEME_PATTERN.match(str(uri_or_path)): # Interpret this as a filepath. path = uri_or_path - uri_str = parse.urlunparse( + uri_str = urlunparse( ("file", "localhost", str(Path(path).absolute()), "", "", None) ) else: # Interpret this as a URI. uri_str = uri_or_path - uri = httpx.URL(uri_str) - # Ensure that, if the scheme is file, it meets the techincal standard for - # file URIs, like file://localhost/..., not the shorthand file:///... - if uri.scheme == "file": - uri = uri.copy_with(host="localhost") - return uri + parsed = urlparse(uri_str) + if parsed.netloc == "": + mutable = list(parsed) + mutable[1] = "localhost" + uri_str = urlunparse(mutable) + return str(uri_str) diff --git a/tiled/examples/generate_files.py b/tiled/examples/generate_files.py index f4790d7f0..45915fe3a 100644 --- a/tiled/examples/generate_files.py +++ b/tiled/examples/generate_files.py @@ -10,7 +10,11 @@ ("a.tif",), ("b.tif",), ("c.tif",), - ("more", "d.tif"), + ("more", "A0001.tif"), + ("more", "A0002.tif"), + ("more", "A0003.tif"), + ("more", "B0001.tif"), + ("more", "B0002.tif"), ("more", "even_more", "e.tif"), ("more", "even_more", "f.tif"), ] diff --git a/tiled/examples/xdi.py b/tiled/examples/xdi.py index d1bff35a9..828ad2c52 100644 --- a/tiled/examples/xdi.py +++ b/tiled/examples/xdi.py @@ -13,10 +13,12 @@ from tiled.adapters.dataframe import DataFrameAdapter from tiled.structures.core import Spec +from tiled.utils import path_from_uri -def read_xdi(filepath, structure=None, metadata=None, specs=None, access_policy=None): +def read_xdi(data_uri, structure=None, metadata=None, specs=None, access_policy=None): "Read XDI-formatted file." + filepath = path_from_uri(data_uri) with open(filepath, "r") as file: metadata = {} fields = collections.defaultdict(dict) diff --git a/tiled/server/schemas.py b/tiled/server/schemas.py index 40258fa5f..b40da5682 100644 --- a/tiled/server/schemas.py +++ b/tiled/server/schemas.py @@ -88,11 +88,19 @@ class Spec(pydantic.BaseModel, extra=pydantic.Extra.forbid, frozen=True): class Asset(pydantic.BaseModel): data_uri: str is_directory: bool + parameter: Optional[str] + num: Optional[int] = None id: Optional[int] = None @classmethod def from_orm(cls, orm): - return cls(id=orm.id, data_uri=orm.data_uri, is_directory=orm.is_directory) + return cls( + data_uri=orm.asset.data_uri, + is_directory=orm.asset.is_directory, + parameter=orm.parameter, + num=orm.num, + id=orm.asset.id, + ) class Management(str, enum.Enum): @@ -143,7 +151,7 @@ def from_orm(cls, orm): structure=orm.structure, mimetype=orm.mimetype, parameters=orm.parameters, - assets=[Asset.from_orm(asset) for asset in orm.assets], + assets=[Asset.from_orm(assoc) for assoc in orm.asset_associations], management=orm.management, ) diff --git a/tiled/utils.py b/tiled/utils.py index bb91cb94c..6fcd736fb 100644 --- a/tiled/utils.py +++ b/tiled/utils.py @@ -8,9 +8,12 @@ import inspect import operator import os +import platform import sys import threading +from pathlib import Path from typing import Any, Callable +from urllib.parse import urlparse import anyio @@ -654,3 +657,25 @@ async def ensure_awaitable(func, *args, **kwargs): return await func(*args, **kwargs) else: return await anyio.to_thread.run_sync(func, *args, **kwargs) + + +def path_from_uri(uri): + """ + Give a URI, return a Path. + + If the URI has a scheme other than 'file', raise ValueError. + + >>> path_from_uri('file://localhost/a/b/c') # POSIX-style + '/a/b/c' + >>> path_from_uri('file://localhost/C:/a/b/c') # Windows-style + 'C:/a/b/c' + """ + parsed = urlparse(uri) + if parsed.scheme != "file": + raise ValueError(f"Only 'file' URIs are supported. URI: {uri}") + if platform.system() == "Windows": + # We slice because we need "C:/..." not "/C:/...". + path = Path(parsed.path[1:]) + else: + path = Path(parsed.path) + return path